mirror of
https://github.com/netbox-community/netbox.git
synced 2026-04-01 07:03:22 +02:00
Add ChangelogMessageMixin to DeviceBulkAddComponentForm and capture changelog_message during bulk component creation. Ensure message is applied to each created component instance. Add test coverage for changelog message propagation.
1237 lines
49 KiB
Python
1237 lines
49 KiB
Python
import logging
|
|
import re
|
|
from collections import Counter
|
|
from copy import deepcopy
|
|
|
|
from django.contrib import messages
|
|
from django.contrib.contenttypes.fields import GenericForeignKey, GenericRel
|
|
from django.core.exceptions import FieldDoesNotExist, ObjectDoesNotExist, ValidationError
|
|
from django.db import IntegrityError, router, transaction
|
|
from django.db.models import ManyToManyField, ProtectedError, RestrictedError
|
|
from django.db.models.fields.reverse_related import ManyToManyRel
|
|
from django.forms import ModelMultipleChoiceField, MultipleHiddenInput
|
|
from django.http import HttpResponse
|
|
from django.shortcuts import get_object_or_404, redirect, render
|
|
from django.utils.safestring import mark_safe
|
|
from django.utils.translation import gettext as _
|
|
from mptt.models import MPTTModel
|
|
|
|
from core.exceptions import JobFailed
|
|
from core.models import ObjectType
|
|
from core.signals import clear_events
|
|
from extras.choices import CustomFieldUIEditableChoices
|
|
from extras.models import CustomField, ExportTemplate
|
|
from netbox.models.features import ChangeLoggingMixin
|
|
from netbox.object_actions import AddObject, BulkDelete, BulkEdit, BulkExport, BulkImport, BulkRename
|
|
from utilities.error_handlers import handle_protectederror
|
|
from utilities.exceptions import AbortRequest, PermissionsViolation
|
|
from utilities.export import TableExport
|
|
from utilities.forms import BulkDeleteForm, BulkRenameForm, restrict_form_fields
|
|
from utilities.forms.bulk_import import BulkImportForm
|
|
from utilities.htmx import htmx_partial
|
|
from utilities.jobs import is_background_request, process_request_as_job
|
|
from utilities.permissions import get_permission_for_model
|
|
from utilities.query import reapply_model_ordering
|
|
from utilities.request import safe_for_redirect
|
|
from utilities.string import title
|
|
from utilities.tables import get_table_configs
|
|
from utilities.views import GetReturnURLMixin, get_action_url
|
|
|
|
from .base import BaseMultiObjectView
|
|
from .mixins import ActionsMixin, TableMixin
|
|
from .utils import get_prerequisite_model
|
|
|
|
__all__ = (
|
|
'BulkComponentCreateView',
|
|
'BulkCreateView',
|
|
'BulkDeleteView',
|
|
'BulkEditView',
|
|
'BulkImportView',
|
|
'BulkRenameView',
|
|
'ObjectListView',
|
|
)
|
|
|
|
|
|
class ObjectListView(BaseMultiObjectView, ActionsMixin, TableMixin):
|
|
"""
|
|
Display multiple objects, all the same type, as a table.
|
|
|
|
Attributes:
|
|
filterset: A django-filter FilterSet that is applied to the queryset
|
|
filterset_form: The form class used to render filter options
|
|
actions: An iterable of ObjectAction subclasses (see ActionsMixin)
|
|
"""
|
|
template_name = 'generic/object_list.html'
|
|
filterset = None
|
|
filterset_form = None
|
|
actions = (AddObject, BulkImport, BulkExport, BulkEdit, BulkRename, BulkDelete)
|
|
|
|
def get_required_permission(self):
|
|
return get_permission_for_model(self.queryset.model, 'view')
|
|
|
|
#
|
|
# Export methods
|
|
#
|
|
|
|
def export_yaml(self):
|
|
"""
|
|
Export the queryset of objects as concatenated YAML documents.
|
|
"""
|
|
yaml_data = [obj.to_yaml() for obj in self.queryset]
|
|
|
|
return '---\n'.join(yaml_data)
|
|
|
|
def export_table(self, table, columns=None, filename=None, delimiter=None):
|
|
"""
|
|
Export all table data in CSV format.
|
|
|
|
Args:
|
|
table: The Table instance to export
|
|
columns: A list of specific columns to include. If None, all columns will be exported.
|
|
filename: The name of the file attachment sent to the client. If None, will be determined automatically
|
|
from the queryset model name.
|
|
delimiter: The character used to separate columns (a comma is used by default)
|
|
"""
|
|
exclude_columns = {'pk', 'actions'}
|
|
if columns:
|
|
all_columns = [col_name for col_name, _ in table.selected_columns + table.available_columns]
|
|
exclude_columns.update({
|
|
col for col in all_columns if col not in columns
|
|
})
|
|
exporter = TableExport(
|
|
export_format=TableExport.CSV,
|
|
table=table,
|
|
exclude_columns=exclude_columns,
|
|
delimiter=delimiter,
|
|
)
|
|
return exporter.response(
|
|
filename=filename or f'netbox_{self.queryset.model._meta.verbose_name_plural}.csv'
|
|
)
|
|
|
|
def export_template(self, template, request):
|
|
"""
|
|
Render an ExportTemplate using the current queryset.
|
|
|
|
Args:
|
|
template: ExportTemplate instance
|
|
request: The current request
|
|
"""
|
|
try:
|
|
return template.render_to_response(queryset=self.queryset)
|
|
except Exception as e:
|
|
messages.error(
|
|
request,
|
|
_("There was an error rendering the selected export template ({template}): {error}").format(
|
|
template=template.name,
|
|
error=e
|
|
)
|
|
)
|
|
# Strip the `export` param and redirect user to the filtered objects list
|
|
query_params = request.GET.copy()
|
|
query_params.pop('export')
|
|
redirect_url = f'{request.path}?{query_params.urlencode()}'
|
|
if safe_for_redirect(redirect_url):
|
|
return redirect(redirect_url)
|
|
return redirect(get_action_url(self.queryset.model, action='list'))
|
|
|
|
#
|
|
# Request handlers
|
|
#
|
|
|
|
def get_queryset(self, request):
|
|
qs = super().get_queryset(request)
|
|
return reapply_model_ordering(qs)
|
|
|
|
def get(self, request):
|
|
"""
|
|
GET request handler.
|
|
|
|
Args:
|
|
request: The current request
|
|
"""
|
|
model = self.queryset.model
|
|
object_type = ObjectType.objects.get_for_model(model)
|
|
|
|
if self.filterset:
|
|
self.queryset = self.filterset(request.GET, self.queryset, request=request).qs
|
|
|
|
# Determine the available actions
|
|
actions = self.get_permitted_actions(request.user)
|
|
has_table_actions = any(action.multi for action in actions)
|
|
|
|
if 'export' in request.GET:
|
|
|
|
# Export the current table view
|
|
if request.GET['export'] == 'table':
|
|
table = self.get_table(self.queryset, request, has_table_actions)
|
|
columns = [name for name, _ in table.selected_columns]
|
|
delimiter = request.user.config.get('csv_delimiter')
|
|
return self.export_table(table, columns, delimiter=delimiter)
|
|
|
|
# Render an ExportTemplate
|
|
if request.GET['export']:
|
|
template = get_object_or_404(ExportTemplate, object_types=object_type, name=request.GET['export'])
|
|
return self.export_template(template, request)
|
|
|
|
# Check for YAML export support on the model
|
|
if hasattr(model, 'to_yaml'):
|
|
response = HttpResponse(self.export_yaml(), content_type='text/yaml')
|
|
filename = 'netbox_{}.yaml'.format(self.queryset.model._meta.verbose_name_plural)
|
|
response['Content-Disposition'] = 'attachment; filename="{}"'.format(filename)
|
|
return response
|
|
|
|
# Fall back to default table/YAML export
|
|
table = self.get_table(self.queryset, request, has_table_actions)
|
|
delimiter = request.user.config.get('csv_delimiter')
|
|
return self.export_table(table, delimiter=delimiter)
|
|
|
|
# Render the objects table
|
|
table = self.get_table(self.queryset, request, has_table_actions)
|
|
|
|
# If this is an HTMX request, return only the rendered table HTML
|
|
if htmx_partial(request):
|
|
if request.GET.get('embedded', False):
|
|
table.embedded = True
|
|
# Hide selection checkboxes
|
|
if 'pk' in table.base_columns:
|
|
table.columns.hide('pk')
|
|
return render(request, 'htmx/table.html', {
|
|
'table': table,
|
|
'model': model,
|
|
'actions': actions,
|
|
})
|
|
|
|
context = {
|
|
'model': model,
|
|
'table': table,
|
|
'table_configs': get_table_configs(table, request.user),
|
|
'actions': actions,
|
|
'filter_form': self.filterset_form(request.GET) if self.filterset_form else None,
|
|
'prerequisite_model': get_prerequisite_model(self.queryset),
|
|
**self.get_extra_context(request),
|
|
}
|
|
|
|
return render(request, self.template_name, context)
|
|
|
|
|
|
class BulkCreateView(GetReturnURLMixin, BaseMultiObjectView):
|
|
"""
|
|
Create new objects in bulk.
|
|
|
|
form: Form class which provides the `pattern` field
|
|
model_form: The ModelForm used to create individual objects
|
|
pattern_target: Name of the field to be evaluated as a pattern (if any)
|
|
"""
|
|
form = None
|
|
model_form = None
|
|
pattern_target = ''
|
|
htmx_template_name = 'htmx/bulk_add_form.html'
|
|
|
|
def get_required_permission(self):
|
|
return get_permission_for_model(self.queryset.model, 'add')
|
|
|
|
def _create_objects(self, form, request):
|
|
new_objects = []
|
|
|
|
# Create objects from the expanded. Abort the transaction on the first validation error.
|
|
for value in form.cleaned_data['pattern']:
|
|
|
|
# Reinstantiate the model form each time to avoid overwriting the same instance. Use a mutable
|
|
# copy of the POST QueryDict so that we can update the target field value.
|
|
model_form = self.model_form(request.POST.copy())
|
|
model_form.data[self.pattern_target] = value
|
|
|
|
# Validate each new object independently.
|
|
if model_form.is_valid():
|
|
obj = model_form.save()
|
|
new_objects.append(obj)
|
|
else:
|
|
# Copy any errors on the pattern target field to the pattern form.
|
|
errors = model_form.errors.as_data()
|
|
if errors.get(self.pattern_target):
|
|
form.add_error('pattern', errors[self.pattern_target])
|
|
# Raise an IntegrityError to break the for loop and abort the transaction.
|
|
raise IntegrityError()
|
|
|
|
return new_objects
|
|
|
|
def _get_context(self, request, form, model_form):
|
|
model = self.queryset.model
|
|
return {
|
|
'object': None,
|
|
'obj_type': model._meta.verbose_name,
|
|
'obj_type_plural': model._meta.verbose_name_plural,
|
|
'form': form,
|
|
'model_form': model_form,
|
|
'return_url': self.get_return_url(request),
|
|
'add_url': get_action_url(model, 'add'),
|
|
**self.get_extra_context(request),
|
|
}
|
|
|
|
#
|
|
# Request handlers
|
|
#
|
|
|
|
def get(self, request):
|
|
# Set initial values for visible form fields from query args
|
|
initial = {}
|
|
for field in getattr(self.model_form._meta, 'fields', []):
|
|
if request.GET.get(field):
|
|
initial[field] = request.GET[field]
|
|
|
|
form = self.form()
|
|
model_form = self.model_form(initial=initial)
|
|
|
|
# HTMX partial: only re-render the model form fields
|
|
if htmx_partial(request):
|
|
return render(request, self.htmx_template_name, {
|
|
'model_form': model_form,
|
|
})
|
|
|
|
return render(request, self.template_name, self._get_context(request, form, model_form))
|
|
|
|
def post(self, request):
|
|
logger = logging.getLogger('netbox.views.BulkCreateView')
|
|
model = self.queryset.model
|
|
form = self.form(request.POST)
|
|
model_form = self.model_form(request.POST)
|
|
|
|
# HTMX partial: only re-render the model form fields
|
|
if htmx_partial(request):
|
|
return render(request, self.htmx_template_name, {
|
|
'model_form': model_form,
|
|
})
|
|
|
|
if form.is_valid():
|
|
logger.debug("Form validation was successful")
|
|
|
|
try:
|
|
with transaction.atomic(using=router.db_for_write(model)):
|
|
new_objs = self._create_objects(form, request)
|
|
|
|
# Enforce object-level permissions
|
|
if self.queryset.filter(pk__in=[obj.pk for obj in new_objs]).count() != len(new_objs):
|
|
raise PermissionsViolation
|
|
|
|
# If we make it to this point, validation has succeeded on all new objects.
|
|
msg = f"Added {len(new_objs)} {model._meta.verbose_name_plural}"
|
|
logger.info(msg)
|
|
messages.success(request, msg)
|
|
|
|
if '_addanother' in request.POST and safe_for_redirect(request.path):
|
|
return redirect(request.path)
|
|
return redirect(self.get_return_url(request))
|
|
|
|
except IntegrityError:
|
|
pass
|
|
|
|
except (AbortRequest, PermissionsViolation) as e:
|
|
logger.debug(e.message)
|
|
form.add_error(None, e.message)
|
|
clear_events.send(sender=self)
|
|
|
|
else:
|
|
logger.debug("Form validation failed")
|
|
|
|
return render(request, self.template_name, self._get_context(request, form, model_form))
|
|
|
|
|
|
class BulkImportView(GetReturnURLMixin, BaseMultiObjectView):
|
|
"""
|
|
Import objects in bulk (CSV/JSON/YAML format).
|
|
|
|
Attributes:
|
|
model_form: The form used to create each imported object
|
|
"""
|
|
template_name = 'generic/bulk_import.html'
|
|
model_form = None
|
|
related_object_forms = dict()
|
|
|
|
def get_required_permission(self):
|
|
return get_permission_for_model(self.queryset.model, 'add')
|
|
|
|
def prep_related_object_data(self, parent, data):
|
|
"""
|
|
Hook to modify the data for related objects before it's passed to the related object form (for example, to
|
|
assign a parent object).
|
|
"""
|
|
return data
|
|
|
|
def _get_form_fields(self):
|
|
form = self.model_form()
|
|
required_fields = {}
|
|
optional_fields = {}
|
|
|
|
# Return only visible fields, with required fields listed first
|
|
for field in form.visible_fields():
|
|
if field.is_hidden:
|
|
continue
|
|
if field.field.required:
|
|
required_fields[field.name] = field.field
|
|
else:
|
|
optional_fields[field.name] = field.field
|
|
|
|
return {**required_fields, **optional_fields}
|
|
|
|
def _compile_form_errors(self, errors, index, prefix=None):
|
|
error_messages = []
|
|
for field_name, errors in errors.items():
|
|
prefix = f'{prefix}.' if prefix else ''
|
|
if field_name == '__all__':
|
|
field_name = ''
|
|
for err in errors:
|
|
error_messages.append(f"Record {index} {prefix}{field_name}: {err}")
|
|
return error_messages
|
|
|
|
def _save_object(self, model_form, request, parent_idx):
|
|
_action = 'Updated' if model_form.instance.pk else 'Created'
|
|
|
|
# Save the primary object
|
|
obj = self.save_object(model_form, request)
|
|
|
|
# Enforce object-level permissions
|
|
if not self.queryset.filter(pk=obj.pk).first():
|
|
raise PermissionsViolation()
|
|
|
|
# Iterate through the related object forms (if any), validating and saving each instance.
|
|
for field_name, related_object_form in self.related_object_forms.items():
|
|
|
|
related_objects = model_form.data.get(field_name, list())
|
|
if not isinstance(related_objects, list):
|
|
raise ValidationError(
|
|
self._compile_form_errors(
|
|
{field_name: [_("Must be a list.")]},
|
|
index=parent_idx
|
|
)
|
|
)
|
|
|
|
related_obj_pks = []
|
|
for i, rel_obj_data in enumerate(related_objects, start=1):
|
|
if not isinstance(rel_obj_data, dict):
|
|
raise ValidationError(
|
|
self._compile_form_errors(
|
|
{f'{field_name}[{i}]': [_("Must be a dictionary.")]},
|
|
index=parent_idx,
|
|
)
|
|
)
|
|
|
|
rel_obj_data = self.prep_related_object_data(obj, rel_obj_data)
|
|
f = related_object_form(rel_obj_data)
|
|
|
|
for subfield_name, field in f.fields.items():
|
|
if subfield_name not in rel_obj_data and hasattr(field, 'initial'):
|
|
f.data[subfield_name] = field.initial
|
|
|
|
if f.is_valid():
|
|
related_obj = f.save()
|
|
related_obj_pks.append(related_obj.pk)
|
|
else:
|
|
# Replicate errors on the related object form to the import form for display and abort
|
|
raise ValidationError(
|
|
self._compile_form_errors(f.errors, index=parent_idx, prefix=f'{field_name}[{i}]')
|
|
)
|
|
|
|
# Enforce object-level permissions on related objects
|
|
model = related_object_form.Meta.model
|
|
if model.objects.filter(pk__in=related_obj_pks).count() != len(related_obj_pks):
|
|
raise ObjectDoesNotExist
|
|
|
|
if is_background_request(request):
|
|
request.job.logger.info(f'{_action} {obj}')
|
|
|
|
return obj
|
|
|
|
def save_object(self, object_form, request):
|
|
"""
|
|
Provide a hook to modify the object immediately before saving it (e.g. to encrypt secret data).
|
|
|
|
Args:
|
|
object_form: The model form instance
|
|
request: The current request
|
|
"""
|
|
return object_form.save()
|
|
|
|
def _process_import_records(self, form, request, records, prefetched_objects):
|
|
"""
|
|
Process CSV import records and save objects.
|
|
"""
|
|
saved_objects = []
|
|
|
|
for i, record in enumerate(records, start=1):
|
|
object_id = int(record.pop('id')) if record.get('id') else None
|
|
|
|
# Determine whether this object is being created or updated
|
|
if object_id:
|
|
try:
|
|
instance = prefetched_objects[object_id]
|
|
except KeyError:
|
|
raise ValidationError(
|
|
self._compile_form_errors(
|
|
{'id': [_("Object with ID {id} does not exist").format(id=object_id)]},
|
|
index=i
|
|
)
|
|
)
|
|
|
|
# Take a snapshot for change logging
|
|
if instance.pk and hasattr(instance, 'snapshot'):
|
|
instance.snapshot()
|
|
|
|
else:
|
|
instance = self.queryset.model()
|
|
|
|
# For newly created objects, apply any default values for custom fields
|
|
for cf in CustomField.objects.get_for_model(self.queryset.model):
|
|
if cf.ui_editable != CustomFieldUIEditableChoices.YES:
|
|
# Skip custom fields which are not editable via the UI
|
|
continue
|
|
field_name = f'cf_{cf.name}'
|
|
if field_name not in record:
|
|
record[field_name] = cf.default
|
|
|
|
# Record changelog message (if any)
|
|
instance._changelog_message = form.cleaned_data.get('changelog_message', '')
|
|
|
|
# Instantiate the model form for the object
|
|
model_form_kwargs = {
|
|
'data': record,
|
|
'instance': instance,
|
|
}
|
|
if hasattr(form, '_csv_headers'):
|
|
model_form_kwargs['headers'] = form._csv_headers # Add CSV headers
|
|
model_form = self.model_form(**model_form_kwargs)
|
|
|
|
# When updating, omit all form fields other than those specified in the record. (No
|
|
# fields are required when modifying an existing object.)
|
|
if object_id:
|
|
unused_fields = [f for f in model_form.fields if f not in record]
|
|
for field_name in unused_fields:
|
|
del model_form.fields[field_name]
|
|
|
|
restrict_form_fields(model_form, request.user)
|
|
|
|
if model_form.is_valid():
|
|
obj = self._save_object(model_form, request, i)
|
|
saved_objects.append(obj)
|
|
else:
|
|
# Raise model form errors
|
|
raise ValidationError(
|
|
self._compile_form_errors(model_form.errors, index=i)
|
|
)
|
|
|
|
return saved_objects
|
|
|
|
def create_and_update_objects(self, form, request):
|
|
records = list(form.cleaned_data['data'])
|
|
|
|
# Prefetch objects to be updated, if any
|
|
prefetch_ids = [int(record['id']) for record in records if record.get('id')]
|
|
|
|
# check for duplicate IDs
|
|
duplicate_pks = [pk for pk, count in Counter(prefetch_ids).items() if count > 1]
|
|
if duplicate_pks:
|
|
error_msg = _(
|
|
"Duplicate objects found: {model} with ID(s) {ids} appears multiple times"
|
|
).format(
|
|
model=title(self.queryset.model._meta.verbose_name),
|
|
ids=', '.join(str(pk) for pk in sorted(duplicate_pks))
|
|
)
|
|
raise ValidationError(error_msg)
|
|
|
|
prefetched_objects = {
|
|
obj.pk: obj
|
|
for obj in self.queryset.model.objects.filter(id__in=prefetch_ids)
|
|
} if prefetch_ids else {}
|
|
|
|
# For MPTT models, delay tree updates until all saves are complete
|
|
if issubclass(self.queryset.model, MPTTModel):
|
|
with self.queryset.model.objects.delay_mptt_updates():
|
|
saved_objects = self._process_import_records(form, request, records, prefetched_objects)
|
|
else:
|
|
saved_objects = self._process_import_records(form, request, records, prefetched_objects)
|
|
|
|
return saved_objects
|
|
|
|
#
|
|
# Request handlers
|
|
#
|
|
|
|
def get(self, request):
|
|
model = self.model_form._meta.model
|
|
form = BulkImportForm()
|
|
if not issubclass(model, ChangeLoggingMixin):
|
|
form.fields.pop('changelog_message')
|
|
|
|
return render(request, self.template_name, {
|
|
'model': model,
|
|
'form': form,
|
|
'fields': self._get_form_fields(),
|
|
'return_url': self.get_return_url(request),
|
|
**self.get_extra_context(request),
|
|
})
|
|
|
|
def post(self, request):
|
|
logger = logging.getLogger('netbox.views.BulkImportView')
|
|
model = self.model_form._meta.model
|
|
form = BulkImportForm(request.POST, request.FILES)
|
|
if not issubclass(model, ChangeLoggingMixin):
|
|
form.fields.pop('changelog_message')
|
|
|
|
if form.is_valid():
|
|
logger.debug("Import form validation was successful")
|
|
redirect_url = get_action_url(model, action='list')
|
|
|
|
# If indicated, defer this request to a background job & redirect the user
|
|
if form.cleaned_data['background_job']:
|
|
job_name = _('Bulk import {count} {object_type}').format(
|
|
count=len(form.cleaned_data['data']),
|
|
object_type=model._meta.verbose_name_plural,
|
|
)
|
|
if process_request_as_job(self.__class__, request, name=job_name):
|
|
return redirect(redirect_url)
|
|
|
|
try:
|
|
# Iterate through data and bind each record to a new model form instance.
|
|
with transaction.atomic(using=router.db_for_write(model)):
|
|
new_objects = self.create_and_update_objects(form, request)
|
|
|
|
# Enforce object-level permissions
|
|
if self.queryset.filter(pk__in=[obj.pk for obj in new_objects]).count() != len(new_objects):
|
|
raise PermissionsViolation
|
|
|
|
msg = _('Imported {count} {object_type}').format(
|
|
count=len(new_objects),
|
|
object_type=model._meta.verbose_name_plural
|
|
)
|
|
logger.info(msg)
|
|
|
|
# Handle background job
|
|
if is_background_request(request):
|
|
request.job.logger.info(msg)
|
|
return None
|
|
|
|
messages.success(request, msg)
|
|
return redirect(f"{redirect_url}?modified_by_request={request.id}")
|
|
|
|
except (AbortRequest, PermissionsViolation, ValidationError) as e:
|
|
err_messages = e.messages if type(e) is ValidationError else [e.message]
|
|
for msg in err_messages:
|
|
logger.debug(msg)
|
|
form.add_error(None, msg)
|
|
if is_background_request(request):
|
|
request.job.logger.error(msg)
|
|
request.job.logger.warning("Bulk import aborted")
|
|
clear_events.send(sender=self)
|
|
if is_background_request(request):
|
|
raise JobFailed
|
|
|
|
else:
|
|
logger.debug("Form validation failed")
|
|
|
|
return render(request, self.template_name, {
|
|
'model': model,
|
|
'form': form,
|
|
'fields': self._get_form_fields(),
|
|
'return_url': self.get_return_url(request),
|
|
**self.get_extra_context(request),
|
|
})
|
|
|
|
|
|
class BulkEditView(GetReturnURLMixin, BaseMultiObjectView):
|
|
"""
|
|
Edit objects in bulk.
|
|
|
|
Attributes:
|
|
filterset: FilterSet to apply when deleting by QuerySet
|
|
form: The form class used to edit objects in bulk
|
|
"""
|
|
template_name = 'generic/bulk_edit.html'
|
|
filterset = None
|
|
form = None
|
|
|
|
def get_required_permission(self):
|
|
return get_permission_for_model(self.queryset.model, 'change')
|
|
|
|
def post_save_operations(self, form, obj):
|
|
"""
|
|
This method is called for each object in _update_objects. Override to perform additional object-level
|
|
operations that are specific to a particular ModelForm.
|
|
"""
|
|
# Add/remove tags
|
|
if form.cleaned_data.get('add_tags', None):
|
|
obj.tags.add(*form.cleaned_data['add_tags'])
|
|
if form.cleaned_data.get('remove_tags', None):
|
|
obj.tags.remove(*form.cleaned_data['remove_tags'])
|
|
|
|
def _update_objects(self, form, request):
|
|
custom_fields = getattr(form, 'custom_fields', {})
|
|
standard_fields = [
|
|
field for field in form.fields if field not in list(custom_fields) + ['pk']
|
|
]
|
|
nullified_fields = request.POST.getlist('_nullify')
|
|
updated_objects = []
|
|
model_fields = {}
|
|
m2m_fields = {}
|
|
|
|
# Build list of model fields and m2m fields for later iteration
|
|
for name in standard_fields:
|
|
try:
|
|
model_field = self.queryset.model._meta.get_field(name)
|
|
if isinstance(model_field, (ManyToManyField, ManyToManyRel)):
|
|
m2m_fields[name] = model_field
|
|
elif isinstance(model_field, GenericRel):
|
|
# Ignore generic relations (these may be used for other purposes in the form)
|
|
continue
|
|
else:
|
|
model_fields[name] = model_field
|
|
except FieldDoesNotExist:
|
|
# This form field is used to modify a field rather than set its value directly
|
|
model_fields[name] = None
|
|
|
|
for obj in self.queryset.filter(pk__in=form.cleaned_data['pk']):
|
|
|
|
# Take a snapshot of change-logged models
|
|
if hasattr(obj, 'snapshot'):
|
|
obj.snapshot()
|
|
|
|
# Attach the changelog message (if any) to the object
|
|
obj._changelog_message = form.cleaned_data.get('changelog_message')
|
|
|
|
# Update standard fields. If a field is listed in _nullify, delete its value.
|
|
for name, model_field in model_fields.items():
|
|
# Handle nullification
|
|
if name in form.nullable_fields and name in nullified_fields:
|
|
if type(model_field) is GenericForeignKey:
|
|
setattr(obj, name, None)
|
|
else:
|
|
setattr(obj, name, None if model_field.null else '')
|
|
# Normal fields
|
|
elif name in form.changed_data:
|
|
setattr(obj, name, form.cleaned_data[name])
|
|
|
|
# Update custom fields
|
|
for name, customfield in custom_fields.items():
|
|
assert name.startswith('cf_')
|
|
cf_name = name[3:] # Strip cf_ prefix
|
|
if name in form.nullable_fields and name in nullified_fields:
|
|
obj.custom_field_data[cf_name] = None
|
|
elif name in form.changed_data:
|
|
obj.custom_field_data[cf_name] = customfield.serialize(form.cleaned_data[name])
|
|
|
|
# Store M2M values for validation
|
|
obj._m2m_values = {}
|
|
for field in obj._meta.local_many_to_many:
|
|
if value := form.cleaned_data.get(field.name):
|
|
obj._m2m_values[field.name] = list(value)
|
|
elif field.name in nullified_fields:
|
|
obj._m2m_values[field.name] = []
|
|
|
|
obj.full_clean()
|
|
obj.save()
|
|
updated_objects.append(obj)
|
|
|
|
# Handle M2M fields after save
|
|
for name, m2m_field in m2m_fields.items():
|
|
if name in form.nullable_fields and name in nullified_fields:
|
|
getattr(obj, name).clear()
|
|
elif form.cleaned_data[name]:
|
|
getattr(obj, name).set(form.cleaned_data[name])
|
|
|
|
self.post_save_operations(form, obj)
|
|
|
|
if is_background_request(request):
|
|
request.job.logger.info(f"Updated {obj}")
|
|
|
|
# Rebuild the tree for MPTT models
|
|
if issubclass(self.queryset.model, MPTTModel):
|
|
self.queryset.model.objects.rebuild()
|
|
|
|
return updated_objects
|
|
|
|
#
|
|
# Request handlers
|
|
#
|
|
|
|
def get(self, request):
|
|
return redirect(self.get_return_url(request))
|
|
|
|
def post(self, request, **kwargs):
|
|
logger = logging.getLogger('netbox.views.BulkEditView')
|
|
model = self.queryset.model
|
|
|
|
# If we are editing *all* objects in the queryset, replace the PK list with all matched objects.
|
|
if request.POST.get('_all') and self.filterset is not None:
|
|
pk_list = self.filterset(request.GET, self.queryset.values_list('pk', flat=True), request=request).qs
|
|
else:
|
|
pk_list = request.POST.getlist('pk')
|
|
|
|
# Include the PK list as initial data for the form
|
|
initial_data = {'pk': pk_list}
|
|
|
|
# Check for other contextual data needed for the form. We avoid passing all of request.GET because the
|
|
# filter values will conflict with the bulk edit form fields.
|
|
# TODO: Find a better way to accomplish this
|
|
if 'device' in request.GET:
|
|
initial_data['device'] = request.GET.get('device')
|
|
elif 'device_type' in request.GET:
|
|
initial_data['device_type'] = request.GET.get('device_type')
|
|
elif 'virtual_machine' in request.GET:
|
|
initial_data['virtual_machine'] = request.GET.get('virtual_machine')
|
|
|
|
post_data = request.POST.copy()
|
|
post_data.setlist('pk', pk_list)
|
|
form = self.form(post_data, initial=initial_data)
|
|
restrict_form_fields(form, request.user)
|
|
|
|
if '_apply' in request.POST:
|
|
if form.is_valid():
|
|
logger.debug("Form validation was successful")
|
|
|
|
# If indicated, defer this request to a background job & redirect the user
|
|
if form.cleaned_data['background_job']:
|
|
job_name = _('Bulk edit {count} {object_type}').format(
|
|
count=len(form.cleaned_data['pk']),
|
|
object_type=model._meta.verbose_name_plural,
|
|
)
|
|
if process_request_as_job(self.__class__, request, name=job_name):
|
|
return redirect(self.get_return_url(request))
|
|
|
|
try:
|
|
with transaction.atomic(using=router.db_for_write(model)):
|
|
updated_objects = self._update_objects(form, request)
|
|
|
|
# Enforce object-level permissions
|
|
object_count = self.queryset.filter(pk__in=[obj.pk for obj in updated_objects]).count()
|
|
if object_count != len(updated_objects):
|
|
raise PermissionsViolation
|
|
|
|
msg = _('Updated {count} {object_type}').format(
|
|
count=len(updated_objects),
|
|
object_type=model._meta.verbose_name_plural,
|
|
)
|
|
logger.info(msg)
|
|
|
|
# Handle background job
|
|
if is_background_request(request):
|
|
request.job.logger.info(msg)
|
|
return None
|
|
|
|
messages.success(self.request, msg)
|
|
return redirect(self.get_return_url(request))
|
|
|
|
except (AbortRequest, PermissionsViolation, ValidationError) as e:
|
|
err_messages = e.messages if type(e) is ValidationError else [e.message]
|
|
for msg in err_messages:
|
|
logger.debug(msg)
|
|
form.add_error(None, msg)
|
|
if is_background_request(request):
|
|
request.job.logger.error(msg)
|
|
clear_events.send(sender=self)
|
|
if is_background_request(request):
|
|
raise JobFailed
|
|
|
|
else:
|
|
logger.debug("Form validation failed")
|
|
|
|
# Retrieve objects being edited
|
|
table = self.table(self.queryset.filter(pk__in=pk_list), orderable=False)
|
|
if not table.rows:
|
|
messages.warning(
|
|
request,
|
|
_("No {object_type} were selected.").format(object_type=model._meta.verbose_name_plural)
|
|
)
|
|
return redirect(self.get_return_url(request))
|
|
|
|
return render(request, self.template_name, {
|
|
'model': model,
|
|
'form': form,
|
|
'table': table,
|
|
'return_url': self.get_return_url(request),
|
|
**self.get_extra_context(request),
|
|
})
|
|
|
|
|
|
class BulkRenameView(GetReturnURLMixin, BaseMultiObjectView):
|
|
"""
|
|
An extendable view for renaming objects in bulk.
|
|
|
|
Attributes:
|
|
field_name: The name of the object attribute for which the value is being updated (defaults to "name")
|
|
"""
|
|
field_name = 'name'
|
|
template_name = 'generic/bulk_rename.html'
|
|
# Match BulkEditView/BulkDeleteView behavior: allow passing a FilterSet
|
|
# so "Select all N matching query" can expand across the full queryset.
|
|
filterset = None
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
|
|
# Create a new Form class from BulkRenameForm
|
|
class _Form(BulkRenameForm):
|
|
pk = ModelMultipleChoiceField(
|
|
queryset=self.queryset,
|
|
widget=MultipleHiddenInput()
|
|
)
|
|
|
|
self.form = _Form
|
|
|
|
def get_required_permission(self):
|
|
return get_permission_for_model(self.queryset.model, 'change')
|
|
|
|
def _rename_objects(self, form, selected_objects):
|
|
renamed_pks = []
|
|
|
|
for obj in selected_objects:
|
|
# Take a snapshot of change-logged models
|
|
if hasattr(obj, 'snapshot'):
|
|
obj.snapshot()
|
|
|
|
find = form.cleaned_data['find']
|
|
replace = form.cleaned_data['replace']
|
|
if form.cleaned_data['use_regex']:
|
|
try:
|
|
obj.new_name = re.sub(find, replace, getattr(obj, self.field_name, '') or '')
|
|
# Catch regex group reference errors
|
|
except re.error:
|
|
obj.new_name = getattr(obj, self.field_name)
|
|
else:
|
|
obj.new_name = (getattr(obj, self.field_name, '') or '').replace(find, replace)
|
|
renamed_pks.append(obj.pk)
|
|
|
|
return renamed_pks
|
|
|
|
def post(self, request):
|
|
logger = logging.getLogger('netbox.views.BulkRenameView')
|
|
|
|
# If we are editing *all* objects in the queryset, replace the PK list with all matched objects.
|
|
if request.POST.get('_all') and self.filterset is not None:
|
|
pk_list = self.filterset(request.GET, self.queryset.values_list('pk', flat=True), request=request).qs
|
|
else:
|
|
pk_list = request.POST.getlist('pk')
|
|
|
|
selected_objects = self.queryset.filter(pk__in=pk_list)
|
|
|
|
if '_preview' in request.POST or '_apply' in request.POST:
|
|
form = self.form(request.POST, initial={'pk': pk_list})
|
|
|
|
if form.is_valid():
|
|
try:
|
|
with transaction.atomic(using=router.db_for_write(self.queryset.model)):
|
|
renamed_pks = self._rename_objects(form, selected_objects)
|
|
|
|
if '_apply' in request.POST:
|
|
# For MPTT models, delay tree updates until all saves are complete
|
|
if issubclass(self.queryset.model, MPTTModel):
|
|
with self.queryset.model.objects.delay_mptt_updates():
|
|
for obj in selected_objects:
|
|
setattr(obj, self.field_name, obj.new_name)
|
|
obj.save()
|
|
else:
|
|
for obj in selected_objects:
|
|
setattr(obj, self.field_name, obj.new_name)
|
|
obj.save()
|
|
|
|
# Enforce constrained permissions
|
|
if self.queryset.filter(pk__in=renamed_pks).count() != len(selected_objects):
|
|
raise PermissionsViolation
|
|
|
|
messages.success(
|
|
request,
|
|
_("Renamed {count} {object_type}").format(
|
|
count=len(selected_objects),
|
|
object_type=self.queryset.model._meta.verbose_name_plural
|
|
)
|
|
)
|
|
return redirect(self.get_return_url(request))
|
|
|
|
except IntegrityError as e:
|
|
messages.error(self.request, ", ".join(e.args))
|
|
clear_events.send(sender=self)
|
|
|
|
except (AbortRequest, PermissionsViolation) as e:
|
|
logger.debug(e.message)
|
|
form.add_error(None, e.message)
|
|
clear_events.send(sender=self)
|
|
|
|
else:
|
|
form = self.form(initial={'pk': pk_list})
|
|
|
|
return render(request, self.template_name, {
|
|
'field_name': self.field_name,
|
|
'form': form,
|
|
'obj_type_plural': self.queryset.model._meta.verbose_name_plural,
|
|
'selected_objects': selected_objects,
|
|
'return_url': self.get_return_url(request),
|
|
})
|
|
|
|
|
|
class BulkDeleteView(GetReturnURLMixin, BaseMultiObjectView):
|
|
"""
|
|
Delete objects in bulk.
|
|
|
|
Attributes:
|
|
filterset: FilterSet to apply when deleting by QuerySet
|
|
table: The table used to display devices being deleted
|
|
"""
|
|
template_name = 'generic/bulk_delete.html'
|
|
filterset = None
|
|
table = None
|
|
|
|
def get_required_permission(self):
|
|
return get_permission_for_model(self.queryset.model, 'delete')
|
|
|
|
#
|
|
# Request handlers
|
|
#
|
|
|
|
def get(self, request):
|
|
return redirect(self.get_return_url(request))
|
|
|
|
def post(self, request, **kwargs):
|
|
logger = logging.getLogger('netbox.views.BulkDeleteView')
|
|
model = self.queryset.model
|
|
|
|
# Are we deleting *all* objects in the queryset or just a selected subset?
|
|
if request.POST.get('_all'):
|
|
qs = model.objects.all()
|
|
if self.filterset is not None:
|
|
qs = self.filterset(request.GET, qs, request=request).qs
|
|
pk_list = qs.only('pk').values_list('pk', flat=True)
|
|
else:
|
|
pk_list = [int(pk) for pk in request.POST.getlist('pk')]
|
|
|
|
if '_confirm' in request.POST:
|
|
form = BulkDeleteForm(model, request.POST)
|
|
if form.is_valid():
|
|
logger.debug("Form validation was successful")
|
|
|
|
# If indicated, defer this request to a background job & redirect the user
|
|
if form.cleaned_data['background_job']:
|
|
job_name = _('Bulk delete {count} {object_type}').format(
|
|
count=len(form.cleaned_data['pk']),
|
|
object_type=model._meta.verbose_name_plural,
|
|
)
|
|
if process_request_as_job(self.__class__, request, name=job_name):
|
|
return redirect(self.get_return_url(request))
|
|
|
|
# Delete objects
|
|
queryset = self.queryset.filter(pk__in=pk_list)
|
|
deleted_count = queryset.count()
|
|
try:
|
|
with transaction.atomic(using=router.db_for_write(model)):
|
|
for obj in queryset:
|
|
|
|
# Take a snapshot of change-logged models
|
|
if hasattr(obj, 'snapshot'):
|
|
obj.snapshot()
|
|
|
|
# Attach the changelog message (if any) to the object
|
|
obj._changelog_message = form.cleaned_data.get('changelog_message')
|
|
|
|
# Delete the object
|
|
obj.delete()
|
|
|
|
if is_background_request(request):
|
|
request.job.logger.info(f"Deleted {obj}")
|
|
|
|
msg = _('Deleted {count} {object_type}').format(
|
|
count=deleted_count,
|
|
object_type=model._meta.verbose_name_plural
|
|
)
|
|
logger.info(msg)
|
|
|
|
# Handle background job
|
|
if is_background_request(request):
|
|
request.job.logger.info(msg)
|
|
return None
|
|
|
|
messages.success(request, msg)
|
|
|
|
except (ProtectedError, RestrictedError) as e:
|
|
logger.warning(f"Caught {type(e)} while attempting to delete objects")
|
|
if is_background_request(request):
|
|
request.job.logger.error(
|
|
_("Deletion failed due to the presence of one or more dependent objects.")
|
|
)
|
|
raise JobFailed
|
|
handle_protectederror(queryset, request, e)
|
|
|
|
except AbortRequest as e:
|
|
logger.debug(e.message)
|
|
if is_background_request(request):
|
|
request.job.logger.error(e.message)
|
|
raise JobFailed
|
|
messages.error(request, mark_safe(e.message))
|
|
|
|
return redirect(self.get_return_url(request))
|
|
|
|
logger.debug("Form validation failed")
|
|
|
|
else:
|
|
form = BulkDeleteForm(model, initial={
|
|
'pk': pk_list,
|
|
'return_url': self.get_return_url(request),
|
|
})
|
|
|
|
# Retrieve objects being deleted
|
|
table = self.table(self.queryset.filter(pk__in=pk_list), orderable=False)
|
|
if not table.rows:
|
|
messages.warning(
|
|
request,
|
|
_("No {object_type} were selected.").format(object_type=model._meta.verbose_name_plural)
|
|
)
|
|
return redirect(self.get_return_url(request))
|
|
|
|
return render(request, self.template_name, {
|
|
'model': model,
|
|
'form': form,
|
|
'table': table,
|
|
'return_url': self.get_return_url(request),
|
|
**self.get_extra_context(request),
|
|
})
|
|
|
|
|
|
#
|
|
# Device/VirtualMachine components
|
|
#
|
|
|
|
class BulkComponentCreateView(GetReturnURLMixin, BaseMultiObjectView):
|
|
"""
|
|
Add one or more components (e.g. interfaces, console ports, etc.) to a set of Devices or VirtualMachines.
|
|
"""
|
|
template_name = 'generic/bulk_add_component.html'
|
|
parent_model = None
|
|
parent_field = None
|
|
form = None
|
|
model_form = None
|
|
filterset = None
|
|
table = None
|
|
|
|
def get_required_permission(self):
|
|
return f'dcim.add_{self.queryset.model._meta.model_name}'
|
|
|
|
def post(self, request):
|
|
logger = logging.getLogger('netbox.views.BulkComponentCreateView')
|
|
parent_model_name = self.parent_model._meta.verbose_name_plural
|
|
model_name = self.queryset.model._meta.verbose_name_plural
|
|
|
|
# Are we editing *all* objects in the queryset or just a selected subset?
|
|
if request.POST.get('_all') and self.filterset is not None:
|
|
queryset = self.filterset(request.GET, self.parent_model.objects.only('pk'), request=request).qs
|
|
pk_list = [obj.pk for obj in queryset]
|
|
else:
|
|
pk_list = [int(pk) for pk in request.POST.getlist('pk')]
|
|
|
|
selected_objects = self.parent_model.objects.filter(pk__in=pk_list)
|
|
if not selected_objects:
|
|
messages.warning(
|
|
request,
|
|
_("No {object_type} were selected.").format(object_type=self.parent_model._meta.verbose_name_plural)
|
|
)
|
|
return redirect(self.get_return_url(request))
|
|
table = self.table(selected_objects, orderable=False)
|
|
|
|
if '_create' in request.POST:
|
|
form = self.form(request.POST)
|
|
|
|
if form.is_valid():
|
|
logger.debug("Form validation was successful")
|
|
|
|
# If indicated, defer this request to a background job & redirect the user
|
|
if form.cleaned_data['background_job']:
|
|
job_name = _('Bulk add {count} {object_type}').format(
|
|
count=len(form.cleaned_data['pk']),
|
|
object_type=self.queryset.model._meta.verbose_name_plural,
|
|
)
|
|
if process_request_as_job(self.__class__, request, name=job_name):
|
|
return redirect(self.get_return_url(request))
|
|
|
|
new_components = []
|
|
data = deepcopy(form.cleaned_data)
|
|
changelog_message = data.pop('changelog_message', '')
|
|
data.pop('background_job', None)
|
|
replication_data = {
|
|
field: data.pop(field) for field in form.replication_fields
|
|
}
|
|
|
|
try:
|
|
with transaction.atomic(using=router.db_for_write(self.queryset.model)):
|
|
|
|
for obj in data['pk']:
|
|
|
|
pattern_count = len(replication_data[form.replication_fields[0]])
|
|
for i in range(pattern_count):
|
|
component_data = {
|
|
self.parent_field: obj.pk
|
|
}
|
|
component_data.update(data)
|
|
for field, values in replication_data.items():
|
|
if values:
|
|
component_data[field] = values[i]
|
|
|
|
component_form = self.model_form(component_data)
|
|
if component_form.is_valid():
|
|
if changelog_message:
|
|
component_form.instance._changelog_message = changelog_message
|
|
instance = component_form.save()
|
|
logger.debug(f"Created {instance} on {instance.parent_object}")
|
|
new_components.append(instance)
|
|
else:
|
|
for field, errors in component_form.errors.as_data().items():
|
|
for e in errors:
|
|
err_msg = '{}: {}'.format(obj, ', '.join(e))
|
|
form.add_error(field, err_msg)
|
|
if is_background_request(request):
|
|
request.job.logger.error(err_msg)
|
|
|
|
# Enforce object-level permissions
|
|
component_ids = [obj.pk for obj in new_components]
|
|
if self.queryset.filter(pk__in=component_ids).count() != len(new_components):
|
|
raise PermissionsViolation
|
|
|
|
except IntegrityError:
|
|
clear_events.send(sender=self)
|
|
if is_background_request(request):
|
|
request.job.logger.error(_("An integrity error occurred while creating components"))
|
|
raise JobFailed
|
|
|
|
except (AbortRequest, PermissionsViolation) as e:
|
|
logger.debug(e.message)
|
|
form.add_error(None, e.message)
|
|
clear_events.send(sender=self)
|
|
if is_background_request(request):
|
|
request.job.logger.error(e.message)
|
|
raise JobFailed
|
|
|
|
if not form.errors:
|
|
msg = _("Added {count} {component} to {parent_count} {parent}.").format(
|
|
count=len(new_components),
|
|
component=model_name,
|
|
parent_count=len(form.cleaned_data['pk']),
|
|
parent=parent_model_name,
|
|
)
|
|
logger.info(msg)
|
|
|
|
# Handle background job
|
|
if is_background_request(request):
|
|
request.job.logger.info(msg)
|
|
return None
|
|
|
|
messages.success(request, msg)
|
|
|
|
return redirect(self.get_return_url(request))
|
|
|
|
else:
|
|
logger.debug("Form validation failed")
|
|
|
|
else:
|
|
form = self.form(initial={'pk': pk_list})
|
|
|
|
return render(request, self.template_name, {
|
|
'form': form,
|
|
'parent_model_name': parent_model_name,
|
|
'model_name': model_name,
|
|
'table': table,
|
|
'return_url': self.get_return_url(request),
|
|
})
|