diff --git a/netbox/extras/api/customfields.py b/netbox/extras/api/customfields.py index 2113cd0c0..2c559f9af 100644 --- a/netbox/extras/api/customfields.py +++ b/netbox/extras/api/customfields.py @@ -85,6 +85,15 @@ class CustomFieldsDataField(Field): "values." ) + # Reject any unknown custom field names + valid_field_names = {cf.name for cf in self._get_custom_fields()} + invalid_fields = set(data.keys()) - valid_field_names + if invalid_fields: + raise ValidationError({ + field: _("Custom field '{name}' does not exist for this object type.").format(name=field) + for field in sorted(invalid_fields) + }) + # Serialize object and multi-object values for cf in self._get_custom_fields(): if cf.name in data and data[cf.name] not in CUSTOMFIELD_EMPTY_VALUES and cf.type in ( diff --git a/netbox/extras/tests/test_customfields.py b/netbox/extras/tests/test_customfields.py index a8b65f119..9dc117864 100644 --- a/netbox/extras/tests/test_customfields.py +++ b/netbox/extras/tests/test_customfields.py @@ -7,7 +7,7 @@ from django.test import tag from django.urls import reverse from rest_framework import status -from core.models import ObjectType +from core.models import ObjectChange, ObjectType from dcim.filtersets import SiteFilterSet from dcim.forms import SiteImportForm from dcim.models import Manufacturer, Rack, Site @@ -1194,6 +1194,84 @@ class CustomFieldAPITest(APITestCase): list(original_cfvs['multiobject_field']) ) + def test_update_single_object_rejects_unknown_custom_fields(self): + site2 = Site.objects.get(name='Site 2') + original_cf_data = {**site2.custom_field_data} + url = reverse('dcim-api:site-detail', kwargs={'pk': site2.pk}) + self.add_permissions('dcim.change_site') + + data = { + 'custom_fields': { + 'text_field': 'valid', + 'thisfieldshouldntexist': 'random text here', + }, + } + + response = self.client.patch(url, data, format='json', **self.header) + self.assertHttpStatus(response, status.HTTP_400_BAD_REQUEST) + self.assertIn('thisfieldshouldntexist', response.data['custom_fields']) + + # Ensure the object was not modified + site2.refresh_from_db() + self.assertEqual(site2.custom_field_data, original_cf_data) + + @tag('regression') + def test_update_single_object_prunes_stale_custom_field_data_from_database_and_changelog(self): + stale_key = 'thisfieldshouldntexist' + stale_value = 'random text here' + updated_text_value = 'ABCD' + + site2 = Site.objects.get(name='Site 2') + object_type = ObjectType.objects.get_for_model(Site) + original_text_value = site2.custom_field_data['text_field'] + + # Seed stale custom field data directly in the database to mimic a polluted row. + Site.objects.filter(pk=site2.pk).update( + custom_field_data={ + **site2.custom_field_data, + stale_key: stale_value, + } + ) + site2.refresh_from_db() + self.assertIn(stale_key, site2.custom_field_data) + + existing_change_ids = set( + ObjectChange.objects.filter( + changed_object_type=object_type, + changed_object_id=site2.pk, + ).values_list('pk', flat=True) + ) + + url = reverse('dcim-api:site-detail', kwargs={'pk': site2.pk}) + self.add_permissions('dcim.change_site') + data = { + 'custom_fields': { + 'text_field': updated_text_value, + }, + } + + response = self.client.patch(url, data, format='json', **self.header) + self.assertHttpStatus(response, status.HTTP_200_OK) + + site2.refresh_from_db() + self.assertEqual(site2.cf['text_field'], updated_text_value) + self.assertNotIn(stale_key, site2.custom_field_data) + + objectchanges = ObjectChange.objects.filter( + changed_object_type=object_type, + changed_object_id=site2.pk, + ).exclude(pk__in=existing_change_ids) + self.assertEqual(objectchanges.count(), 1) + + objectchange = objectchanges.get() + prechange_cf = objectchange.prechange_data['custom_fields'] + postchange_cf = objectchange.postchange_data['custom_fields'] + + self.assertEqual(prechange_cf['text_field'], original_text_value) + self.assertEqual(postchange_cf['text_field'], updated_text_value) + self.assertNotIn(stale_key, prechange_cf) + self.assertNotIn(stale_key, postchange_cf) + def test_specify_related_object_by_attr(self): site1 = Site.objects.get(name='Site 1') vlans = VLAN.objects.all()[:3] diff --git a/netbox/netbox/api/serializers/base.py b/netbox/netbox/api/serializers/base.py index 0d149e81c..0f933ed5f 100644 --- a/netbox/netbox/api/serializers/base.py +++ b/netbox/netbox/api/serializers/base.py @@ -95,9 +95,6 @@ class ValidatedModelSerializer(BaseModelSerializer): attrs = data.copy() - # Remove custom field data (if any) prior to model validation - attrs.pop('custom_fields', None) - # Skip ManyToManyFields opts = self.Meta.model._meta m2m_values = {} @@ -116,4 +113,8 @@ class ValidatedModelSerializer(BaseModelSerializer): # Skip uniqueness validation of individual fields inside `full_clean()` (this is handled by the serializer) instance.full_clean(validate_unique=False) + # Preserve any normalization performed by model.clean() (e.g. stale custom field pruning) + if 'custom_field_data' in attrs: + data['custom_field_data'] = instance.custom_field_data + return data diff --git a/netbox/utilities/serialization.py b/netbox/utilities/serialization.py index 5509867ae..3fa1a0920 100644 --- a/netbox/utilities/serialization.py +++ b/netbox/utilities/serialization.py @@ -31,7 +31,12 @@ def serialize_object(obj, resolve_tags=True, extra=None, exclude=None): # Include custom_field_data as "custom_fields" if 'custom_field_data' in data: - data['custom_fields'] = data.pop('custom_field_data') + custom_field_data = data.pop('custom_field_data') + custom_fields = getattr(obj, 'custom_fields', None) + if custom_fields is not None: + valid_names = {cf.name for cf in custom_fields} + custom_field_data = {key: value for key, value in custom_field_data.items() if key in valid_names} + data['custom_fields'] = custom_field_data # Resolve any assigned tags to their names. Check for tags cached on the instance; # fall back to using the manager.