Compare commits

..

1 Commits

Author SHA1 Message Date
Martin Hauser
c720b07538 fix(dcim): Refresh stale CablePath references during serialization
Cable edits can delete and recreate CablePath rows while endpoint
instances remain in memory. Deferred event serialization can then
encounter a stale `_path` reference and raise `CablePath.DoesNotExist`.

Refresh stale `_path` references through `PathEndpoint.path` and route
internal callers through that accessor. Update `EventContext` to track
the latest serialization source for coalesced duplicate enqueues, while
eagerly freezing delete-event payloads before row removal.

Also avoid mutating `event_rule.action_data` when merging the event
payload.

Fixes #21498
2026-04-01 16:00:00 +02:00
24 changed files with 1014 additions and 1041 deletions

View File

@@ -384,18 +384,6 @@ A calendar date. Returns a `datetime.date` object.
A complete date & time. Returns a `datetime.datetime` object.
## Uploading Scripts via the API
Script modules can be uploaded to NetBox via the REST API by sending a `multipart/form-data` POST request to `/api/extras/scripts/upload/`. The caller must have the `extras.add_scriptmodule` and `core.add_managedfile` permissions.
```no-highlight
curl -X POST \
-H "Authorization: Token $TOKEN" \
-H "Accept: application/json; indent=4" \
-F "file=@/path/to/myscript.py" \
http://netbox/api/extras/scripts/upload/
```
## Running Custom Scripts
!!! note

View File

@@ -1,14 +1,12 @@
# Search
Plugins can define and register their own models to extend NetBox's core search functionality. Typically, a plugin will include a file named `search.py`, which holds all search indexes for its models.
Plugins can define and register their own models to extend NetBox's core search functionality. Typically, a plugin will include a file named `search.py`, which holds all search indexes for its models (see the example below).
```python title="search.py"
```python
# search.py
from netbox.search import SearchIndex, register_search
from netbox.search import SearchIndex
from .models import MyModel
@register_search
class MyModelIndex(SearchIndex):
model = MyModel
fields = (
@@ -19,11 +17,15 @@ class MyModelIndex(SearchIndex):
display_attrs = ('site', 'device', 'status', 'description')
```
Decorate each `SearchIndex` subclass with `@register_search` to register it with NetBox. When using the default `search.py` module, no additional `indexes = [...]` list is required.
Fields listed in `display_attrs` will not be cached for search, but will be displayed alongside the object when it appears in global search results. This is helpful for conveying to the user additional information about an object.
Fields listed in `display_attrs` are not cached for matching, but they are displayed alongside the object in global search results to provide additional context.
To register one or more indexes with NetBox, define a list named `indexes` at the end of this file:
```python
indexes = [MyModelIndex]
```
!!! tip
The legacy `indexes = [...]` list remains supported via `PluginConfig.search_indexes` for backward compatibility and custom loading patterns.
The path to the list of search indexes can be modified by setting `search_indexes` in the PluginConfig instance.
::: netbox.search.SearchIndex

View File

@@ -38,7 +38,19 @@ class ConnectedEndpointsSerializer(serializers.ModelSerializer):
@extend_schema_field(serializers.BooleanField)
def get_connected_endpoints_reachable(self, obj):
return obj._path and obj._path.is_complete and obj._path.is_active
"""
Determines if the connected endpoints are reachable through a cable path.
This method checks whether there is a valid and active cable path that
connects the endpoints. It evaluates both the completeness and active
status of the path to determine reachability.
"""
# Use the public `path` accessor rather than dereferencing `_path`
# directly. `path` already handles the stale in-memory relation case
# that can occur while CablePath rows are rebuilt during cable edits.
if path := obj.path:
return path.is_complete and path.is_active
return False
class PortSerializer(serializers.ModelSerializer):

View File

@@ -6,9 +6,8 @@ from drf_spectacular.utils import extend_schema_field
from rest_framework import serializers
from dcim.choices import *
from dcim.constants import MACADDRESS_ASSIGNMENT_MODELS, MODULE_TOKEN
from dcim.constants import MACADDRESS_ASSIGNMENT_MODELS
from dcim.models import Device, DeviceBay, MACAddress, Module, VirtualDeviceContext
from dcim.utils import get_module_bay_positions, resolve_module_placeholder
from extras.api.serializers_.configtemplates import ConfigTemplateSerializer
from ipam.api.serializers_.ip import IPAddressSerializer
from netbox.api.fields import ChoiceField, ContentTypeField, RelatedObjectCountField
@@ -160,60 +159,6 @@ class ModuleSerializer(PrimaryModelSerializer):
]
brief_fields = ('id', 'url', 'display', 'device', 'module_bay', 'module_type', 'description')
def validate(self, data):
data = super().validate(data)
if self.nested:
return data
# Skip validation for existing modules (updates)
if self.instance is not None:
return data
module_bay = data.get('module_bay')
module_type = data.get('module_type')
device = data.get('device')
if not all((module_bay, module_type, device)):
return data
positions = get_module_bay_positions(module_bay)
for templates, component_attribute in [
("consoleporttemplates", "consoleports"),
("consoleserverporttemplates", "consoleserverports"),
("interfacetemplates", "interfaces"),
("powerporttemplates", "powerports"),
("poweroutlettemplates", "poweroutlets"),
("rearporttemplates", "rearports"),
("frontporttemplates", "frontports"),
]:
installed_components = {
component.name: component for component in getattr(device, component_attribute).all()
}
for template in getattr(module_type, templates).all():
resolved_name = template.name
if MODULE_TOKEN in template.name:
if not module_bay.position:
raise serializers.ValidationError(
_("Cannot install module with placeholder values in a module bay with no position defined.")
)
try:
resolved_name = resolve_module_placeholder(template.name, positions)
except ValueError as e:
raise serializers.ValidationError(str(e))
if resolved_name in installed_components:
raise serializers.ValidationError(
_("A {model} named {name} already exists").format(
model=template.component_model.__name__,
name=resolved_name
)
)
return data
class MACAddressSerializer(PrimaryModelSerializer):
assigned_object_type = ContentTypeField(

View File

@@ -3,7 +3,6 @@ from django.utils.translation import gettext_lazy as _
from dcim.choices import *
from dcim.constants import *
from dcim.utils import get_module_bay_positions, resolve_module_placeholder
from utilities.forms import get_field_value
__all__ = (
@@ -71,6 +70,18 @@ class InterfaceCommonForm(forms.Form):
class ModuleCommonForm(forms.Form):
def _get_module_bay_tree(self, module_bay):
module_bays = []
while module_bay:
module_bays.append(module_bay)
if module_bay.module:
module_bay = module_bay.module.module_bay
else:
module_bay = None
module_bays.reverse()
return module_bays
def clean(self):
super().clean()
@@ -89,7 +100,7 @@ class ModuleCommonForm(forms.Form):
self.instance._disable_replication = True
return
positions = get_module_bay_positions(module_bay)
module_bays = self._get_module_bay_tree(module_bay)
for templates, component_attribute in [
("consoleporttemplates", "consoleports"),
@@ -108,16 +119,25 @@ class ModuleCommonForm(forms.Form):
# Get the templates for the module type.
for template in getattr(module_type, templates).all():
resolved_name = template.name
# Installing modules with placeholders require that the bay has a position value
if MODULE_TOKEN in template.name:
if not module_bay.position:
raise forms.ValidationError(
_("Cannot install module with placeholder values in a module bay with no position defined.")
)
try:
resolved_name = resolve_module_placeholder(template.name, positions)
except ValueError as e:
raise forms.ValidationError(str(e))
if len(module_bays) != template.name.count(MODULE_TOKEN):
raise forms.ValidationError(
_(
"Cannot install module with placeholder values in a module bay tree {level} in tree "
"but {tokens} placeholders given."
).format(
level=len(module_bays), tokens=template.name.count(MODULE_TOKEN)
)
)
for module_bay in module_bays:
resolved_name = resolved_name.replace(MODULE_TOKEN, module_bay.position, 1)
existing_item = installed_components.get(resolved_name)

View File

@@ -9,7 +9,6 @@ from dcim.choices import *
from dcim.constants import *
from dcim.models.base import PortMappingBase
from dcim.models.mixins import InterfaceValidationMixin
from dcim.utils import get_module_bay_positions, resolve_module_placeholder
from netbox.models import ChangeLoggedModel
from utilities.fields import ColorField, NaturalOrderingField
from utilities.mptt import TreeManager
@@ -166,15 +165,31 @@ class ModularComponentTemplateModel(ComponentTemplateModel):
_("A component template must be associated with either a device type or a module type.")
)
def _get_module_tree(self, module):
modules = []
while module:
modules.append(module)
if module.module_bay:
module = module.module_bay.module
else:
module = None
modules.reverse()
return modules
def _resolve_module_placeholder(self, value, module):
if MODULE_TOKEN not in value or not module:
return value
modules = self._get_module_tree(module)
for m in modules:
value = value.replace(MODULE_TOKEN, m.module_bay.position, 1)
return value
def resolve_name(self, module):
if MODULE_TOKEN not in self.name or not module:
return self.name
return resolve_module_placeholder(self.name, get_module_bay_positions(module.module_bay))
return self._resolve_module_placeholder(self.name, module)
def resolve_label(self, module):
if MODULE_TOKEN not in self.label or not module:
return self.label
return resolve_module_placeholder(self.label, get_module_bay_positions(module.module_bay))
return self._resolve_module_placeholder(self.label, module)
class ConsolePortTemplate(ModularComponentTemplateModel):
@@ -705,9 +720,7 @@ class ModuleBayTemplate(ModularComponentTemplateModel):
verbose_name_plural = _('module bay templates')
def resolve_position(self, module):
if MODULE_TOKEN not in self.position or not module:
return self.position
return resolve_module_placeholder(self.position, get_module_bay_positions(module.module_bay))
return self._resolve_module_placeholder(self.position, module)
def instantiate(self, **kwargs):
return self.component_model(

View File

@@ -2,7 +2,7 @@ from functools import cached_property
from django.contrib.contenttypes.fields import GenericForeignKey, GenericRelation
from django.contrib.postgres.fields import ArrayField
from django.core.exceptions import ValidationError
from django.core.exceptions import ObjectDoesNotExist, ValidationError
from django.core.validators import MaxValueValidator, MinValueValidator
from django.db import models
from django.db.models import Sum
@@ -298,20 +298,24 @@ class CabledObjectModel(models.Model):
class PathEndpoint(models.Model):
"""
An abstract model inherited by any CabledObjectModel subclass which represents the end of a CablePath; specifically,
these include ConsolePort, ConsoleServerPort, PowerPort, PowerOutlet, Interface, and PowerFeed.
An abstract model inherited by any CabledObjectModel subclass which
represents the end of a CablePath; specifically, these include
ConsolePort, ConsoleServerPort, PowerPort, PowerOutlet, Interface, and
PowerFeed.
`_path` references the CablePath originating from this instance, if any. It is set or cleared by the receivers in
dcim.signals in response to changes in the cable path, and complements the `origin` GenericForeignKey field on the
CablePath model. `_path` should not be accessed directly; rather, use the `path` property.
`connected_endpoints()` is a convenience method for returning the destination of the associated CablePath, if any.
`_path` references the CablePath originating from this instance, if any.
It is set or cleared by the receivers in dcim.signals in response to
changes in the cable path, and complements the `origin` GenericForeignKey
field on the CablePath model. `_path` should not be accessed directly;
rather, use the `path` property. `connected_endpoints()` is a convenience
method for returning the destination of the associated CablePath, if any.
"""
_path = models.ForeignKey(
to='dcim.CablePath',
on_delete=models.SET_NULL,
null=True,
blank=True
blank=True,
)
class Meta:
@@ -323,39 +327,74 @@ class PathEndpoint(models.Model):
# Construct the complete path (including e.g. bridged interfaces)
while origin is not None:
if origin._path is None:
# Go through the public accessor rather than dereferencing `_path`
# directly. During cable edits, CablePath rows can be deleted and
# recreated while this endpoint instance is still in memory.
cable_path = origin.path
if cable_path is None:
break
path.extend(origin._path.path_objects)
path.extend(cable_path.path_objects)
# If the path ends at a non-connected pass-through port, pad out the link and far-end terminations
# If the path ends at a non-connected pass-through port, pad out
# the link and far-end terminations.
if len(path) % 3 == 1:
path.extend(([], []))
# If the path ends at a site or provider network, inject a null "link" to render an attachment
# If the path ends at a site or provider network, inject a null
# "link" to render an attachment.
elif len(path) % 3 == 2:
path.insert(-1, [])
# Check for a bridged relationship to continue the trace
destinations = origin._path.destinations
# Check for a bridged relationship to continue the trace.
destinations = cable_path.destinations
if len(destinations) == 1:
origin = getattr(destinations[0], 'bridge', None)
else:
origin = None
# Return the path as a list of three-tuples (A termination(s), cable(s), B termination(s))
# Return the path as a list of three-tuples
# (A termination(s), cable(s), B termination(s))
return list(zip(*[iter(path)] * 3))
@property
def path(self):
return self._path
"""
Return this endpoint's current CablePath, if any.
`_path` is a denormalized reference that is updated from CablePath
save/delete handlers, including queryset.update() calls on origin
endpoints. That means an already-instantiated endpoint can briefly hold
a stale in-memory `_path` relation while the database already points to
a different CablePath (or to no path at all).
If the cached relation points to a CablePath that has just been
deleted, refresh only the `_path` field from the database and retry.
This keeps the fix cheap and narrowly scoped to the denormalized FK.
"""
if self._path_id is None:
return None
try:
return self._path
except ObjectDoesNotExist:
# Refresh only the denormalized FK instead of the whole model.
# The expected problem here is in-memory staleness during path
# rebuilds, not persistent database corruption.
self.refresh_from_db(fields=['_path'])
return self._path if self._path_id else None
@cached_property
def connected_endpoints(self):
"""
Caching accessor for the attached CablePath's destination (if any)
Caching accessor for the attached CablePath's destinations (if any).
Always route through `path` so stale in-memory `_path` references are
repaired before we cache the result for the lifetime of this instance.
"""
return self._path.destinations if self._path else []
if cable_path := self.path:
return cable_path.destinations
return []
#

View File

@@ -5,6 +5,7 @@ from circuits.models import *
from core.models import ObjectType
from dcim.choices import *
from dcim.models import *
from extras.events import serialize_for_event
from extras.models import CustomField
from ipam.models import Prefix
from netbox.choices import WeightUnitChoices
@@ -893,77 +894,6 @@ class ModuleBayTestCase(TestCase):
nested_bay = module.modulebays.get(name='Sub-bay 1-1')
self.assertEqual(nested_bay.position, '1-1')
@tag('regression') # #20474
def test_single_module_token_at_nested_depth(self):
"""
A module type with a single {module} token should install at depth > 1
without raising a token count mismatch error, resolving to the immediate
parent bay's position.
"""
manufacturer = Manufacturer.objects.first()
site = Site.objects.first()
device_role = DeviceRole.objects.first()
device_type = DeviceType.objects.create(
manufacturer=manufacturer,
model='Chassis with Rear Card',
slug='chassis-with-rear-card'
)
ModuleBayTemplate.objects.create(
device_type=device_type,
name='Rear card slot',
position='1'
)
rear_card_type = ModuleType.objects.create(
manufacturer=manufacturer,
model='Rear Card'
)
ModuleBayTemplate.objects.create(
module_type=rear_card_type,
name='SFP slot 1',
position='1'
)
ModuleBayTemplate.objects.create(
module_type=rear_card_type,
name='SFP slot 2',
position='2'
)
sfp_type = ModuleType.objects.create(
manufacturer=manufacturer,
model='SFP Module'
)
InterfaceTemplate.objects.create(
module_type=sfp_type,
name='SFP {module}',
type=InterfaceTypeChoices.TYPE_10GE_SFP_PLUS
)
device = Device.objects.create(
name='Test Chassis',
device_type=device_type,
role=device_role,
site=site
)
rear_card_bay = device.modulebays.get(name='Rear card slot')
rear_card = Module.objects.create(
device=device,
module_bay=rear_card_bay,
module_type=rear_card_type
)
sfp_bay = rear_card.modulebays.get(name='SFP slot 2')
sfp_module = Module.objects.create(
device=device,
module_bay=sfp_bay,
module_type=sfp_type
)
interface = sfp_module.interfaces.first()
self.assertEqual(interface.name, 'SFP 2')
@tag('regression') # #20912
def test_module_bay_parent_cleared_when_module_removed(self):
"""Test that the parent field is properly cleared when a module bay's module assignment is removed"""
@@ -1345,6 +1275,65 @@ class CableTestCase(TestCase):
self.assertEqual(a_terms, [interface1])
self.assertEqual(b_terms, [interface2])
@tag('regression') # #21498
def test_path_refreshes_replaced_cablepath_reference(self):
"""
An already-instantiated interface should refresh its denormalized
`_path` foreign key when the referenced CablePath row has been
replaced in the database.
"""
stale_interface = Interface.objects.get(device__name='TestDevice1', name='eth0')
old_path = CablePath.objects.get(pk=stale_interface._path_id)
new_path = CablePath(
path=old_path.path,
is_active=old_path.is_active,
is_complete=old_path.is_complete,
is_split=old_path.is_split,
)
old_path_id = old_path.pk
old_path.delete()
new_path.save()
# The old CablePath no longer exists
self.assertFalse(CablePath.objects.filter(pk=old_path_id).exists())
# The already-instantiated interface still points to the deleted path
# until the accessor refreshes `_path` from the database.
self.assertEqual(stale_interface._path_id, old_path_id)
self.assertEqual(stale_interface.path.pk, new_path.pk)
@tag('regression') # #21498
def test_serialize_for_event_handles_stale_cablepath_reference_after_retermination(self):
"""
Serializing an interface whose previously cached `_path` row has been
deleted during cable retermination must not raise.
"""
stale_interface = Interface.objects.get(device__name='TestDevice2', name='eth0')
old_path_id = stale_interface._path_id
new_peer = Interface.objects.get(device__name='TestDevice2', name='eth1')
cable = stale_interface.cable
self.assertIsNotNone(cable)
self.assertIsNotNone(old_path_id)
self.assertEqual(stale_interface.cable_end, 'B')
cable.b_terminations = [new_peer]
cable.save()
# The old CablePath was deleted during retrace.
self.assertFalse(CablePath.objects.filter(pk=old_path_id).exists())
# The stale in-memory instance still holds the deleted FK value.
self.assertEqual(stale_interface._path_id, old_path_id)
# Serialization must not raise ObjectDoesNotExist. Because this interface
# was the former B-side termination, it is now disconnected.
data = serialize_for_event(stale_interface)
self.assertIsNone(data['connected_endpoints'])
self.assertIsNone(data['connected_endpoints_type'])
self.assertFalse(data['connected_endpoints_reachable'])
class VirtualDeviceContextTestCase(TestCase):

View File

@@ -420,23 +420,14 @@ class VirtualChassisMembersPanel(panels.ObjectPanel):
"""
A panel which lists all members of a virtual chassis.
"""
template_name = 'dcim/panels/virtual_chassis_members.html'
title = _('Virtual Chassis Members')
actions = [
actions.AddObject(
'dcim.device',
url_params={
'site': lambda ctx: (
ctx['virtual_chassis'].master.site_id
if ctx['virtual_chassis'] and ctx['virtual_chassis'].master_id
else ''
),
'rack': lambda ctx: (
ctx['virtual_chassis'].master.rack_id
if ctx['virtual_chassis'] and ctx['virtual_chassis'].master_id
else ''
),
'site': lambda ctx: ctx['object'].master.site_id if ctx['object'].master else '',
'rack': lambda ctx: ctx['object'].master.rack_id if ctx['object'].master else '',
},
),
]
@@ -541,7 +532,7 @@ class VirtualCircuitPanel(panels.ObjectPanel):
def render(self, context):
obj = context.get('object')
if not obj or not obj.is_virtual or not hasattr(obj, 'virtual_circuit_termination'):
if not obj or not obj.is_virtual or not obj.virtual_circuit_termination:
return ''
ctx = self.get_context(context)
return render_to_string(self.template_name, ctx, request=ctx.get('request'))

View File

@@ -3,9 +3,6 @@ from collections import defaultdict
from django.apps import apps
from django.contrib.contenttypes.models import ContentType
from django.db import router, transaction
from django.utils.translation import gettext as _
from dcim.constants import MODULE_TOKEN
def compile_path_node(ct_id, object_id):
@@ -36,51 +33,6 @@ def path_node_to_object(repr):
return ct.model_class().objects.filter(pk=object_id).first()
def get_module_bay_positions(module_bay):
"""
Given a module bay, traverse up the module hierarchy and return
a list of bay position strings from root to leaf.
"""
positions = []
while module_bay:
positions.append(module_bay.position)
if module_bay.module:
module_bay = module_bay.module.module_bay
else:
module_bay = None
positions.reverse()
return positions
def resolve_module_placeholder(value, positions):
"""
Resolve {module} placeholder tokens in a string using the given
list of module bay positions (ordered root to leaf).
A single {module} token resolves to the leaf (immediate parent) bay's position.
Multiple tokens must match the tree depth and resolve level-by-level.
Returns the resolved string.
Raises ValueError if token count is greater than 1 and doesn't match tree depth.
"""
if MODULE_TOKEN not in value:
return value
token_count = value.count(MODULE_TOKEN)
if token_count == 1:
return value.replace(MODULE_TOKEN, positions[-1])
if token_count == len(positions):
for pos in positions:
value = value.replace(MODULE_TOKEN, pos, 1)
return value
raise ValueError(
_("Cannot install module with placeholder values in a module bay tree "
"{level} levels deep but {tokens} placeholders given.").format(
level=len(positions), tokens=token_count
)
)
def create_cablepaths(objects):
"""
Create CablePaths for all paths originating from the specified set of nodes.

View File

@@ -2,7 +2,7 @@ from django.utils.translation import gettext as _
from drf_spectacular.types import OpenApiTypes
from drf_spectacular.utils import extend_schema_field
from rest_framework.fields import Field
from rest_framework.serializers import ListSerializer, ValidationError
from rest_framework.serializers import ValidationError
from extras.choices import CustomFieldTypeChoices
from extras.constants import CUSTOMFIELD_EMPTY_VALUES
@@ -49,25 +49,8 @@ class CustomFieldsDataField(Field):
# TODO: Fix circular import
from utilities.api import get_serializer_for_model
data = {}
cache = self.parent.context.get('cf_object_cache')
for cf in self._get_custom_fields():
if cache is not None and cf.type in (
CustomFieldTypeChoices.TYPE_OBJECT,
CustomFieldTypeChoices.TYPE_MULTIOBJECT,
):
raw = obj.get(cf.name)
if raw is None:
value = None
elif cf.type == CustomFieldTypeChoices.TYPE_OBJECT:
model = cf.related_object_type.model_class()
value = cache.get((model, raw))
else:
model = cf.related_object_type.model_class()
value = [cache[(model, pk)] for pk in raw if (model, pk) in cache] or None
else:
value = cf.deserialize(obj.get(cf.name))
value = cf.deserialize(obj.get(cf.name))
if value is not None and cf.type == CustomFieldTypeChoices.TYPE_OBJECT:
serializer = get_serializer_for_model(cf.related_object_type.model_class())
value = serializer(value, nested=True, context=self.parent.context).data
@@ -104,32 +87,3 @@ class CustomFieldsDataField(Field):
data = {**self.parent.instance.custom_field_data, **data}
return data
class CustomFieldListSerializer(ListSerializer):
"""
ListSerializer that pre-fetches all OBJECT/MULTIOBJECT custom field related objects
in bulk before per-item serialization.
"""
def to_representation(self, data):
cf_field = self.child.fields.get('custom_fields')
if isinstance(cf_field, CustomFieldsDataField):
object_type_cfs = [
cf for cf in cf_field._get_custom_fields()
if cf.type in (CustomFieldTypeChoices.TYPE_OBJECT, CustomFieldTypeChoices.TYPE_MULTIOBJECT)
]
cache = {}
for cf in object_type_cfs:
model = cf.related_object_type.model_class()
pks = set()
for item in data:
raw = item.custom_field_data.get(cf.name)
if raw is not None:
if cf.type == CustomFieldTypeChoices.TYPE_MULTIOBJECT:
pks.update(raw)
else:
pks.add(raw)
for obj in model.objects.filter(pk__in=pks):
cache[(model, obj.pk)] = obj
self.child.context['cf_object_cache'] = cache
return super().to_representation(data)

View File

@@ -1,70 +1,19 @@
import logging
from django.core.files.storage import storages
from django.db import IntegrityError
from django.utils.translation import gettext_lazy as _
from django.utils.translation import gettext as _
from drf_spectacular.utils import extend_schema_field
from rest_framework import serializers
from core.api.serializers_.jobs import JobSerializer
from core.choices import ManagedFileRootPathChoices
from extras.models import Script, ScriptModule
from extras.models import Script
from netbox.api.serializers import ValidatedModelSerializer
from utilities.datetime import local_now
logger = logging.getLogger(__name__)
__all__ = (
'ScriptDetailSerializer',
'ScriptInputSerializer',
'ScriptModuleSerializer',
'ScriptSerializer',
)
class ScriptModuleSerializer(ValidatedModelSerializer):
file = serializers.FileField(write_only=True)
file_path = serializers.CharField(read_only=True)
class Meta:
model = ScriptModule
fields = ['id', 'display', 'file_path', 'file', 'created', 'last_updated']
brief_fields = ('id', 'display')
def validate(self, data):
# ScriptModule.save() sets file_root; inject it here so full_clean() succeeds.
# Pop 'file' before model instantiation — ScriptModule has no such field.
file = data.pop('file', None)
data['file_root'] = ManagedFileRootPathChoices.SCRIPTS
data = super().validate(data)
data.pop('file_root', None)
if file is not None:
data['file'] = file
return data
def create(self, validated_data):
file = validated_data.pop('file')
storage = storages.create_storage(storages.backends["scripts"])
validated_data['file_path'] = storage.save(file.name, file)
created = False
try:
instance = super().create(validated_data)
created = True
return instance
except IntegrityError as e:
if 'file_path' in str(e):
raise serializers.ValidationError(
_("A script module with this file name already exists.")
)
raise
finally:
if not created and (file_path := validated_data.get('file_path')):
try:
storage.delete(file_path)
except Exception:
logger.warning(f"Failed to delete orphaned script file '{file_path}' from storage.")
class ScriptSerializer(ValidatedModelSerializer):
description = serializers.SerializerMethodField(read_only=True)
vars = serializers.SerializerMethodField(read_only=True)

View File

@@ -26,7 +26,6 @@ router.register('journal-entries', views.JournalEntryViewSet)
router.register('config-contexts', views.ConfigContextViewSet)
router.register('config-context-profiles', views.ConfigContextProfileViewSet)
router.register('config-templates', views.ConfigTemplateViewSet)
router.register('scripts/upload', views.ScriptModuleViewSet)
router.register('scripts', views.ScriptViewSet, basename='script')
app_name = 'extras-api'

View File

@@ -6,7 +6,7 @@ from rest_framework import status
from rest_framework.decorators import action
from rest_framework.exceptions import PermissionDenied
from rest_framework.generics import RetrieveUpdateDestroyAPIView
from rest_framework.mixins import CreateModelMixin, ListModelMixin, RetrieveModelMixin
from rest_framework.mixins import ListModelMixin, RetrieveModelMixin
from rest_framework.renderers import JSONRenderer
from rest_framework.response import Response
from rest_framework.routers import APIRootView
@@ -21,7 +21,6 @@ from netbox.api.features import SyncedDataMixin
from netbox.api.metadata import ContentTypeMetadata
from netbox.api.renderers import TextRenderer
from netbox.api.viewsets import BaseViewSet, NetBoxModelViewSet
from netbox.api.viewsets.mixins import ObjectValidationMixin
from utilities.exceptions import RQWorkerNotRunningException
from utilities.request import copy_safe_request
@@ -265,11 +264,6 @@ class ConfigTemplateViewSet(SyncedDataMixin, ConfigTemplateRenderMixin, NetBoxMo
# Scripts
#
class ScriptModuleViewSet(ObjectValidationMixin, CreateModelMixin, BaseViewSet):
queryset = ScriptModule.objects.all()
serializer_class = serializers.ScriptModuleSerializer
@extend_schema_view(
update=extend_schema(request=serializers.ScriptInputSerializer),
partial_update=extend_schema(request=serializers.ScriptInputSerializer),

View File

@@ -25,22 +25,59 @@ logger = logging.getLogger('netbox.events_processor')
class EventContext(UserDict):
"""
A custom dictionary that automatically serializes its associated object on demand.
Dictionary-compatible wrapper for queued events that lazily serializes
``event['data']`` on first access.
Backward-compatible with the plain-dict interface expected by existing
EVENTS_PIPELINE consumers. When the same object is enqueued more than once
in a single request, the serialization source is updated so consumers see
the latest state.
"""
# We're emulating a dictionary here (rather than using a custom class) because prior to NetBox v4.5.2, events were
# queued as dictionaries for processing by handles in EVENTS_PIPELINE. We need to avoid introducing any breaking
# changes until a suitable minor release.
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Track which model instance should be serialized if/when `data` is
# requested. This may be refreshed on duplicate enqueue, while leaving
# the public `object` entry untouched for compatibility.
self._serialization_source = self.get('object')
def refresh_serialization_source(self, instance):
"""
Point lazy serialization at a fresher instance, invalidating any
already-materialized ``data``.
"""
self._serialization_source = instance
# UserDict.__contains__ checks the backing dict directly, so `in`
# does not trigger __getitem__'s lazy serialization.
if 'data' in self:
del self['data']
def freeze_data(self, instance):
"""
Eagerly serialize and cache the payload for delete events, where the
object may become inaccessible after deletion.
"""
super().__setitem__('data', serialize_for_event(instance))
self._serialization_source = None
def __getitem__(self, item):
if item == 'data' and 'data' not in self:
data = serialize_for_event(self['object'])
self.__setitem__('data', data)
# Materialize the payload only when an event consumer asks for it.
#
# On coalesced events, use the latest explicitly queued instance so
# webhooks/scripts/notifications observe the final queued state for
# that object within the request.
source = self._serialization_source or super().__getitem__('object')
super().__setitem__('data', serialize_for_event(source))
return super().__getitem__(item)
def serialize_for_event(instance):
"""
Return a serialized representation of the given instance suitable for use in a queued event.
Return a serialized representation of the given instance suitable for use
in a queued event.
"""
serializer_class = get_serializer_for_model(instance.__class__)
serializer_context = {
@@ -53,7 +90,8 @@ def serialize_for_event(instance):
def get_snapshots(instance, event_type):
"""
Return a dictionary of pre- and post-change snapshots for the given instance.
Return a dictionary of pre- and post-change snapshots for the given
instance.
"""
if event_type == OBJECT_DELETED:
# Post-change snapshot must be empty for deleted objects
@@ -76,8 +114,9 @@ def get_snapshots(instance, event_type):
def enqueue_event(queue, instance, request, event_type):
"""
Enqueue a serialized representation of a created/updated/deleted object for the processing of
events once the request has completed.
Enqueue (or coalesce) an event for a created/updated/deleted object.
Events are processed after the request completes.
"""
# Bail if this type of object does not support event rules
if not has_feature(instance, 'event_rules'):
@@ -88,11 +127,18 @@ def enqueue_event(queue, instance, request, event_type):
assert instance.pk is not None
key = f'{app_label}.{model_name}:{instance.pk}'
if key in queue:
queue[key]['snapshots']['postchange'] = get_snapshots(instance, event_type)['postchange']
# If the object is being deleted, update any prior "update" event to "delete"
# If the object is being deleted, convert any prior update event into a
# delete event and freeze the payload before the object (or related
# rows) become inaccessible.
if event_type == OBJECT_DELETED:
queue[key]['event_type'] = event_type
else:
# Keep the public `object` entry stable for compatibility.
queue[key].refresh_serialization_source(instance)
else:
queue[key] = EventContext(
object_type=ObjectType.objects.get_for_model(instance),
@@ -106,14 +152,16 @@ def enqueue_event(queue, instance, request, event_type):
username=request.user.username, # DEPRECATED, will be removed in NetBox v4.7.0
request_id=request.id, # DEPRECATED, will be removed in NetBox v4.7.0
)
# Force serialization of objects prior to them actually being deleted
# For delete events, eagerly serialize the payload before the row is gone.
# This covers both first-time enqueues and coalesced update→delete promotions.
if event_type == OBJECT_DELETED:
queue[key]['data'] = serialize_for_event(instance)
queue[key].freeze_data(instance)
def process_event_rules(event_rules, object_type, event):
"""
Process a list of EventRules against an event.
Evaluate and dispatch a list of EventRules against an event.
Notes on event sources:
- Object change events (created/updated/deleted) are enqueued via
@@ -133,9 +181,9 @@ def process_event_rules(event_rules, object_type, event):
if not event_rule.eval_conditions(event['data']):
continue
# Compile event data
event_data = event_rule.action_data or {}
event_data.update(event['data'])
# Merge rule-specific action_data with the event payload.
# Copy to avoid mutating the rule's stored action_data dict.
event_data = {**(event_rule.action_data or {}), **event['data']}
# Webhooks
if event_rule.action_type == EventRuleActionChoices.WEBHOOK:

View File

@@ -74,7 +74,7 @@ class CustomFieldManager(models.Manager.from_queryset(RestrictedQuerySet)):
return custom_fields
content_type = ObjectType.objects.get_for_model(model._meta.concrete_model)
custom_fields = self.get_queryset().filter(object_types=content_type).select_related('related_object_type')
custom_fields = self.get_queryset().filter(object_types=content_type)
# Populate the request cache to avoid redundant lookups
if cache is not None:

View File

@@ -1,9 +1,7 @@
import datetime
import hashlib
from unittest.mock import MagicMock, patch
from django.contrib.contenttypes.models import ContentType
from django.core.files.uploadedfile import SimpleUploadedFile
from django.urls import reverse
from django.utils.timezone import make_aware, now
from rest_framework import status
@@ -1386,54 +1384,3 @@ class NotificationTest(APIViewTestCases.APIViewTestCase):
'event_type': OBJECT_DELETED,
},
]
class ScriptModuleTest(APITestCase):
"""
Tests for the POST /api/extras/scripts/upload/ endpoint.
ScriptModule is a proxy of core.ManagedFile (a different app) so the standard
APIViewTestCases mixins cannot be used directly. All tests use add_permissions()
with explicit Django model-level permissions.
"""
def setUp(self):
super().setUp()
self.url = reverse('extras-api:scriptmodule-list') # /api/extras/scripts/upload/
def test_upload_script_module_without_permission(self):
script_content = b"from extras.scripts import Script\nclass TestScript(Script):\n pass\n"
upload_file = SimpleUploadedFile('test_upload.py', script_content, content_type='text/plain')
response = self.client.post(
self.url,
{'file': upload_file},
format='multipart',
**self.header,
)
self.assertHttpStatus(response, status.HTTP_403_FORBIDDEN)
def test_upload_script_module(self):
# ScriptModule is a proxy of core.ManagedFile; both permissions required.
self.add_permissions('extras.add_scriptmodule', 'core.add_managedfile')
script_content = b"from extras.scripts import Script\nclass TestScript(Script):\n pass\n"
upload_file = SimpleUploadedFile('test_upload.py', script_content, content_type='text/plain')
mock_storage = MagicMock()
mock_storage.save.return_value = 'test_upload.py'
with patch('extras.api.serializers_.scripts.storages') as mock_storages:
mock_storages.create_storage.return_value = mock_storage
mock_storages.backends = {'scripts': {}}
response = self.client.post(
self.url,
{'file': upload_file},
format='multipart',
**self.header,
)
self.assertHttpStatus(response, status.HTTP_201_CREATED)
self.assertEqual(response.data['file_path'], 'test_upload.py')
mock_storage.save.assert_called_once()
self.assertTrue(ScriptModule.objects.filter(file_path='test_upload.py').exists())
def test_upload_script_module_without_file_fails(self):
self.add_permissions('extras.add_scriptmodule', 'core.add_managedfile')
response = self.client.post(self.url, {}, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_400_BAD_REQUEST)

View File

@@ -1,8 +1,10 @@
import json
import uuid
from unittest import skipIf
from unittest.mock import Mock, patch
import django_rq
from django.conf import settings
from django.http import HttpResponse
from django.test import RequestFactory
from django.urls import reverse
@@ -343,6 +345,7 @@ class EventRuleTest(APITestCase):
self.assertEqual(job.kwargs['snapshots']['prechange']['name'], sites[i].name)
self.assertEqual(job.kwargs['snapshots']['prechange']['tags'], ['Bar', 'Foo'])
@skipIf('netbox.tests.dummy_plugin' not in settings.PLUGINS, 'dummy_plugin not in settings.PLUGINS')
def test_send_webhook(self):
request_id = uuid.uuid4()
@@ -426,6 +429,97 @@ class EventRuleTest(APITestCase):
self.assertEqual(job.kwargs['object_type'], script_type)
self.assertEqual(job.kwargs['username'], self.user.username)
def test_duplicate_enqueue_refreshes_lazy_payload(self):
"""
When the same object is enqueued more than once in a single request,
lazy serialization should use the most recently enqueued instance while
preserving the original event['object'] reference.
"""
request = RequestFactory().get(reverse('dcim:site_add'))
request.id = uuid.uuid4()
request.user = self.user
site = Site.objects.create(name='Site 1', slug='site-1')
stale_site = Site.objects.get(pk=site.pk)
queue = {}
enqueue_event(queue, stale_site, request, OBJECT_UPDATED)
event = queue[f'dcim.site:{site.pk}']
# Data should not be materialized yet (lazy serialization)
self.assertNotIn('data', event.data)
fresh_site = Site.objects.get(pk=site.pk)
fresh_site.description = 'foo'
fresh_site.save()
enqueue_event(queue, fresh_site, request, OBJECT_UPDATED)
# The original object reference should be preserved
self.assertIs(event['object'], stale_site)
# But serialized data should reflect the fresher instance
self.assertEqual(event['data']['description'], 'foo')
self.assertEqual(event['snapshots']['postchange']['description'], 'foo')
def test_duplicate_enqueue_invalidates_materialized_data(self):
"""
If event['data'] has already been materialized before a second enqueue
for the same object, the stale payload should be discarded and rebuilt
from the fresher instance on next access.
"""
request = RequestFactory().get(reverse('dcim:site_add'))
request.id = uuid.uuid4()
request.user = self.user
site = Site.objects.create(name='Site 1', slug='site-1')
queue = {}
enqueue_event(queue, site, request, OBJECT_UPDATED)
event = queue[f'dcim.site:{site.pk}']
# Force early materialization
self.assertEqual(event['data']['description'], '')
# Now update and re-enqueue
fresh_site = Site.objects.get(pk=site.pk)
fresh_site.description = 'updated'
fresh_site.save()
enqueue_event(queue, fresh_site, request, OBJECT_UPDATED)
# Stale data should have been invalidated; new access should reflect update
self.assertEqual(event['data']['description'], 'updated')
def test_update_then_delete_enqueue_freezes_payload(self):
"""
When an update event is coalesced with a subsequent delete, the event
type should be promoted to OBJECT_DELETED and the payload should be
eagerly frozen (since the object will be inaccessible after deletion).
"""
request = RequestFactory().get(reverse('dcim:site_add'))
request.id = uuid.uuid4()
request.user = self.user
site = Site.objects.create(name='Site 1', slug='site-1')
queue = {}
enqueue_event(queue, site, request, OBJECT_UPDATED)
event = queue[f'dcim.site:{site.pk}']
enqueue_event(queue, site, request, OBJECT_DELETED)
# Event type should have been promoted
self.assertEqual(event['event_type'], OBJECT_DELETED)
# Data should already be materialized (frozen), not lazy
self.assertIn('data', event.data)
self.assertEqual(event['data']['name'], 'Site 1')
self.assertIsNone(event['snapshots']['postchange'])
def test_duplicate_triggers(self):
"""
Test for erroneous duplicate event triggers resulting from saving an object multiple times

View File

@@ -159,11 +159,9 @@ class Aggregate(ContactsMixin, GetAvailablePrefixesMixin, PrimaryModel):
@property
def family(self):
if not self.prefix:
return None
if isinstance(self.prefix, str):
return netaddr.IPNetwork(self.prefix).version
return self.prefix.version
if self.prefix:
return self.prefix.version
return None
@property
def ipv6_full(self):
@@ -337,19 +335,11 @@ class Prefix(ContactsMixin, GetAvailablePrefixesMixin, CachedScopeMixin, Primary
@property
def family(self):
if not self.prefix:
return None
if isinstance(self.prefix, str):
return netaddr.IPNetwork(self.prefix).version
return self.prefix.version
return self.prefix.version if self.prefix else None
@property
def mask_length(self):
if not self.prefix:
return None
if isinstance(self.prefix, str):
return netaddr.IPNetwork(self.prefix).prefixlen
return self.prefix.prefixlen
return self.prefix.prefixlen if self.prefix else None
@property
def ipv6_full(self):
@@ -652,11 +642,7 @@ class IPRange(ContactsMixin, PrimaryModel):
@property
def family(self):
if not self.start_address:
return None
if isinstance(self.start_address, str):
return netaddr.IPAddress(self.start_address.split('/')[0]).version
return self.start_address.version
return self.start_address.version if self.start_address else None
@property
def range(self):
@@ -1004,11 +990,9 @@ class IPAddress(ContactsMixin, PrimaryModel):
@property
def family(self):
if not self.address:
return None
if isinstance(self.address, str):
return netaddr.IPNetwork(self.address).version
return self.address.version
if self.address:
return self.address.version
return None
@property
def is_oob_ip(self):

View File

@@ -11,13 +11,6 @@ from utilities.data import string_to_ranges
class TestAggregate(TestCase):
def test_family_string(self):
# Test property when prefix is a string
agg = Aggregate(prefix='10.0.0.0/8')
self.assertEqual(agg.family, 4)
agg_v6 = Aggregate(prefix='2001:db8::/32')
self.assertEqual(agg_v6.family, 6)
def test_get_utilization(self):
rir = RIR.objects.create(name='RIR 1', slug='rir-1')
aggregate = Aggregate(prefix=IPNetwork('10.0.0.0/8'), rir=rir)
@@ -47,13 +40,6 @@ class TestAggregate(TestCase):
class TestIPRange(TestCase):
def test_family_string(self):
# Test property when start_address is a string
ip_range = IPRange(start_address='10.0.0.1/24', end_address='10.0.0.254/24')
self.assertEqual(ip_range.family, 4)
ip_range_v6 = IPRange(start_address='2001:db8::1/64', end_address='2001:db8::ffff/64')
self.assertEqual(ip_range_v6.family, 6)
def test_overlapping_range(self):
iprange_192_168 = IPRange.objects.create(
start_address=IPNetwork('192.168.0.1/22'), end_address=IPNetwork('192.168.0.49/22')
@@ -104,20 +90,6 @@ class TestIPRange(TestCase):
class TestPrefix(TestCase):
def test_family_string(self):
# Test property when prefix is a string
prefix = Prefix(prefix='10.0.0.0/8')
self.assertEqual(prefix.family, 4)
prefix_v6 = Prefix(prefix='2001:db8::/32')
self.assertEqual(prefix_v6.family, 6)
def test_mask_length_string(self):
# Test property when prefix is a string
prefix = Prefix(prefix='10.0.0.0/8')
self.assertEqual(prefix.mask_length, 8)
prefix_v6 = Prefix(prefix='2001:db8::/32')
self.assertEqual(prefix_v6.mask_length, 32)
def test_get_duplicates(self):
prefixes = Prefix.objects.bulk_create((
Prefix(prefix=IPNetwork('192.0.2.0/24')),
@@ -561,13 +533,6 @@ class TestPrefixHierarchy(TestCase):
class TestIPAddress(TestCase):
def test_family_string(self):
# Test property when address is a string
ip = IPAddress(address='10.0.0.1/24')
self.assertEqual(ip.family, 4)
ip_v6 = IPAddress(address='2001:db8::1/64')
self.assertEqual(ip_v6.family, 6)
def test_get_duplicates(self):
ips = IPAddress.objects.bulk_create((
IPAddress(address=IPNetwork('192.0.2.1/24')),

View File

@@ -1,7 +1,7 @@
from rest_framework import serializers
from rest_framework.fields import CreateOnlyDefault
from extras.api.customfields import CustomFieldDefaultValues, CustomFieldListSerializer, CustomFieldsDataField
from extras.api.customfields import CustomFieldDefaultValues, CustomFieldsDataField
from .base import ValidatedModelSerializer
from .nested import NestedTagSerializer
@@ -23,29 +23,6 @@ class CustomFieldModelSerializer(serializers.Serializer):
default=CreateOnlyDefault(CustomFieldDefaultValues())
)
@classmethod
def many_init(cls, *args, **kwargs):
"""
We can't call super().many_init() and change the outcome because by the time it returns,
the plain ListSerializer is already instantiated.
Because every NetBox serializer defines its own Meta which doesn't inherit from a parent Meta,
this would silently not apply to any real serializer.
Thats why this method replicates many_init from parent and changed the default value for list_serializer_class.
"""
list_kwargs = {}
for key in serializers.LIST_SERIALIZER_KWARGS_REMOVE:
value = kwargs.pop(key, None)
if value is not None:
list_kwargs[key] = value
list_kwargs['child'] = cls(*args, **kwargs)
list_kwargs.update({
key: value for key, value in kwargs.items()
if key in serializers.LIST_SERIALIZER_KWARGS
})
meta = getattr(cls, 'Meta', None)
list_serializer_class = getattr(meta, 'list_serializer_class', CustomFieldListSerializer)
return list_serializer_class(*args, **list_kwargs)
class TaggableModelSerializer(serializers.Serializer):
"""

View File

@@ -159,7 +159,7 @@ class BaseTable(tables.Table):
columns = None
ordering = None
if request.user.is_authenticated and self.prefixed_order_by_field in request.GET:
if self.prefixed_order_by_field in request.GET:
if request.GET[self.prefixed_order_by_field]:
# If an ordering has been specified as a query parameter, save it as the
# user's preferred ordering for this table.

View File

@@ -1,4 +1,3 @@
from django.contrib.auth.models import AnonymousUser
from django.template import Context, Template
from django.test import RequestFactory, TestCase
@@ -47,16 +46,6 @@ class BaseTableTest(TestCase):
prefetch_lookups = table.data.data._prefetch_related_lookups
self.assertEqual(prefetch_lookups, tuple())
def test_configure_anonymous_user_with_ordering(self):
"""
Verify that table.configure() does not raise an error when an anonymous
user sorts a table column.
"""
request = RequestFactory().get('/?sort=name')
request.user = AnonymousUser()
table = DeviceTable(Device.objects.all())
table.configure(request)
class TagColumnTable(NetBoxTable):
tags = columns.TagColumn(url_name='dcim:site_list')

File diff suppressed because it is too large Load Diff