mirror of
https://github.com/netbox-community/netbox.git
synced 2026-04-02 15:37:18 +02:00
Compare commits
1 Commits
main
...
21498-even
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c720b07538 |
@@ -384,18 +384,6 @@ A calendar date. Returns a `datetime.date` object.
|
||||
|
||||
A complete date & time. Returns a `datetime.datetime` object.
|
||||
|
||||
## Uploading Scripts via the API
|
||||
|
||||
Script modules can be uploaded to NetBox via the REST API by sending a `multipart/form-data` POST request to `/api/extras/scripts/upload/`. The caller must have the `extras.add_scriptmodule` and `core.add_managedfile` permissions.
|
||||
|
||||
```no-highlight
|
||||
curl -X POST \
|
||||
-H "Authorization: Token $TOKEN" \
|
||||
-H "Accept: application/json; indent=4" \
|
||||
-F "file=@/path/to/myscript.py" \
|
||||
http://netbox/api/extras/scripts/upload/
|
||||
```
|
||||
|
||||
## Running Custom Scripts
|
||||
|
||||
!!! note
|
||||
|
||||
@@ -1,14 +1,12 @@
|
||||
# Search
|
||||
|
||||
Plugins can define and register their own models to extend NetBox's core search functionality. Typically, a plugin will include a file named `search.py`, which holds all search indexes for its models.
|
||||
Plugins can define and register their own models to extend NetBox's core search functionality. Typically, a plugin will include a file named `search.py`, which holds all search indexes for its models (see the example below).
|
||||
|
||||
```python title="search.py"
|
||||
```python
|
||||
# search.py
|
||||
from netbox.search import SearchIndex, register_search
|
||||
|
||||
from netbox.search import SearchIndex
|
||||
from .models import MyModel
|
||||
|
||||
@register_search
|
||||
class MyModelIndex(SearchIndex):
|
||||
model = MyModel
|
||||
fields = (
|
||||
@@ -19,11 +17,15 @@ class MyModelIndex(SearchIndex):
|
||||
display_attrs = ('site', 'device', 'status', 'description')
|
||||
```
|
||||
|
||||
Decorate each `SearchIndex` subclass with `@register_search` to register it with NetBox. When using the default `search.py` module, no additional `indexes = [...]` list is required.
|
||||
Fields listed in `display_attrs` will not be cached for search, but will be displayed alongside the object when it appears in global search results. This is helpful for conveying to the user additional information about an object.
|
||||
|
||||
Fields listed in `display_attrs` are not cached for matching, but they are displayed alongside the object in global search results to provide additional context.
|
||||
To register one or more indexes with NetBox, define a list named `indexes` at the end of this file:
|
||||
|
||||
```python
|
||||
indexes = [MyModelIndex]
|
||||
```
|
||||
|
||||
!!! tip
|
||||
The legacy `indexes = [...]` list remains supported via `PluginConfig.search_indexes` for backward compatibility and custom loading patterns.
|
||||
The path to the list of search indexes can be modified by setting `search_indexes` in the PluginConfig instance.
|
||||
|
||||
::: netbox.search.SearchIndex
|
||||
|
||||
@@ -38,7 +38,19 @@ class ConnectedEndpointsSerializer(serializers.ModelSerializer):
|
||||
|
||||
@extend_schema_field(serializers.BooleanField)
|
||||
def get_connected_endpoints_reachable(self, obj):
|
||||
return obj._path and obj._path.is_complete and obj._path.is_active
|
||||
"""
|
||||
Determines if the connected endpoints are reachable through a cable path.
|
||||
|
||||
This method checks whether there is a valid and active cable path that
|
||||
connects the endpoints. It evaluates both the completeness and active
|
||||
status of the path to determine reachability.
|
||||
"""
|
||||
# Use the public `path` accessor rather than dereferencing `_path`
|
||||
# directly. `path` already handles the stale in-memory relation case
|
||||
# that can occur while CablePath rows are rebuilt during cable edits.
|
||||
if path := obj.path:
|
||||
return path.is_complete and path.is_active
|
||||
return False
|
||||
|
||||
|
||||
class PortSerializer(serializers.ModelSerializer):
|
||||
|
||||
@@ -6,9 +6,8 @@ from drf_spectacular.utils import extend_schema_field
|
||||
from rest_framework import serializers
|
||||
|
||||
from dcim.choices import *
|
||||
from dcim.constants import MACADDRESS_ASSIGNMENT_MODELS, MODULE_TOKEN
|
||||
from dcim.constants import MACADDRESS_ASSIGNMENT_MODELS
|
||||
from dcim.models import Device, DeviceBay, MACAddress, Module, VirtualDeviceContext
|
||||
from dcim.utils import get_module_bay_positions, resolve_module_placeholder
|
||||
from extras.api.serializers_.configtemplates import ConfigTemplateSerializer
|
||||
from ipam.api.serializers_.ip import IPAddressSerializer
|
||||
from netbox.api.fields import ChoiceField, ContentTypeField, RelatedObjectCountField
|
||||
@@ -160,60 +159,6 @@ class ModuleSerializer(PrimaryModelSerializer):
|
||||
]
|
||||
brief_fields = ('id', 'url', 'display', 'device', 'module_bay', 'module_type', 'description')
|
||||
|
||||
def validate(self, data):
|
||||
data = super().validate(data)
|
||||
|
||||
if self.nested:
|
||||
return data
|
||||
|
||||
# Skip validation for existing modules (updates)
|
||||
if self.instance is not None:
|
||||
return data
|
||||
|
||||
module_bay = data.get('module_bay')
|
||||
module_type = data.get('module_type')
|
||||
device = data.get('device')
|
||||
|
||||
if not all((module_bay, module_type, device)):
|
||||
return data
|
||||
|
||||
positions = get_module_bay_positions(module_bay)
|
||||
|
||||
for templates, component_attribute in [
|
||||
("consoleporttemplates", "consoleports"),
|
||||
("consoleserverporttemplates", "consoleserverports"),
|
||||
("interfacetemplates", "interfaces"),
|
||||
("powerporttemplates", "powerports"),
|
||||
("poweroutlettemplates", "poweroutlets"),
|
||||
("rearporttemplates", "rearports"),
|
||||
("frontporttemplates", "frontports"),
|
||||
]:
|
||||
installed_components = {
|
||||
component.name: component for component in getattr(device, component_attribute).all()
|
||||
}
|
||||
|
||||
for template in getattr(module_type, templates).all():
|
||||
resolved_name = template.name
|
||||
if MODULE_TOKEN in template.name:
|
||||
if not module_bay.position:
|
||||
raise serializers.ValidationError(
|
||||
_("Cannot install module with placeholder values in a module bay with no position defined.")
|
||||
)
|
||||
try:
|
||||
resolved_name = resolve_module_placeholder(template.name, positions)
|
||||
except ValueError as e:
|
||||
raise serializers.ValidationError(str(e))
|
||||
|
||||
if resolved_name in installed_components:
|
||||
raise serializers.ValidationError(
|
||||
_("A {model} named {name} already exists").format(
|
||||
model=template.component_model.__name__,
|
||||
name=resolved_name
|
||||
)
|
||||
)
|
||||
|
||||
return data
|
||||
|
||||
|
||||
class MACAddressSerializer(PrimaryModelSerializer):
|
||||
assigned_object_type = ContentTypeField(
|
||||
|
||||
@@ -3,7 +3,6 @@ from django.utils.translation import gettext_lazy as _
|
||||
|
||||
from dcim.choices import *
|
||||
from dcim.constants import *
|
||||
from dcim.utils import get_module_bay_positions, resolve_module_placeholder
|
||||
from utilities.forms import get_field_value
|
||||
|
||||
__all__ = (
|
||||
@@ -71,6 +70,18 @@ class InterfaceCommonForm(forms.Form):
|
||||
|
||||
class ModuleCommonForm(forms.Form):
|
||||
|
||||
def _get_module_bay_tree(self, module_bay):
|
||||
module_bays = []
|
||||
while module_bay:
|
||||
module_bays.append(module_bay)
|
||||
if module_bay.module:
|
||||
module_bay = module_bay.module.module_bay
|
||||
else:
|
||||
module_bay = None
|
||||
|
||||
module_bays.reverse()
|
||||
return module_bays
|
||||
|
||||
def clean(self):
|
||||
super().clean()
|
||||
|
||||
@@ -89,7 +100,7 @@ class ModuleCommonForm(forms.Form):
|
||||
self.instance._disable_replication = True
|
||||
return
|
||||
|
||||
positions = get_module_bay_positions(module_bay)
|
||||
module_bays = self._get_module_bay_tree(module_bay)
|
||||
|
||||
for templates, component_attribute in [
|
||||
("consoleporttemplates", "consoleports"),
|
||||
@@ -108,16 +119,25 @@ class ModuleCommonForm(forms.Form):
|
||||
# Get the templates for the module type.
|
||||
for template in getattr(module_type, templates).all():
|
||||
resolved_name = template.name
|
||||
# Installing modules with placeholders require that the bay has a position value
|
||||
if MODULE_TOKEN in template.name:
|
||||
if not module_bay.position:
|
||||
raise forms.ValidationError(
|
||||
_("Cannot install module with placeholder values in a module bay with no position defined.")
|
||||
)
|
||||
|
||||
try:
|
||||
resolved_name = resolve_module_placeholder(template.name, positions)
|
||||
except ValueError as e:
|
||||
raise forms.ValidationError(str(e))
|
||||
if len(module_bays) != template.name.count(MODULE_TOKEN):
|
||||
raise forms.ValidationError(
|
||||
_(
|
||||
"Cannot install module with placeholder values in a module bay tree {level} in tree "
|
||||
"but {tokens} placeholders given."
|
||||
).format(
|
||||
level=len(module_bays), tokens=template.name.count(MODULE_TOKEN)
|
||||
)
|
||||
)
|
||||
|
||||
for module_bay in module_bays:
|
||||
resolved_name = resolved_name.replace(MODULE_TOKEN, module_bay.position, 1)
|
||||
|
||||
existing_item = installed_components.get(resolved_name)
|
||||
|
||||
|
||||
@@ -9,7 +9,6 @@ from dcim.choices import *
|
||||
from dcim.constants import *
|
||||
from dcim.models.base import PortMappingBase
|
||||
from dcim.models.mixins import InterfaceValidationMixin
|
||||
from dcim.utils import get_module_bay_positions, resolve_module_placeholder
|
||||
from netbox.models import ChangeLoggedModel
|
||||
from utilities.fields import ColorField, NaturalOrderingField
|
||||
from utilities.mptt import TreeManager
|
||||
@@ -166,15 +165,31 @@ class ModularComponentTemplateModel(ComponentTemplateModel):
|
||||
_("A component template must be associated with either a device type or a module type.")
|
||||
)
|
||||
|
||||
def _get_module_tree(self, module):
|
||||
modules = []
|
||||
while module:
|
||||
modules.append(module)
|
||||
if module.module_bay:
|
||||
module = module.module_bay.module
|
||||
else:
|
||||
module = None
|
||||
|
||||
modules.reverse()
|
||||
return modules
|
||||
|
||||
def _resolve_module_placeholder(self, value, module):
|
||||
if MODULE_TOKEN not in value or not module:
|
||||
return value
|
||||
modules = self._get_module_tree(module)
|
||||
for m in modules:
|
||||
value = value.replace(MODULE_TOKEN, m.module_bay.position, 1)
|
||||
return value
|
||||
|
||||
def resolve_name(self, module):
|
||||
if MODULE_TOKEN not in self.name or not module:
|
||||
return self.name
|
||||
return resolve_module_placeholder(self.name, get_module_bay_positions(module.module_bay))
|
||||
return self._resolve_module_placeholder(self.name, module)
|
||||
|
||||
def resolve_label(self, module):
|
||||
if MODULE_TOKEN not in self.label or not module:
|
||||
return self.label
|
||||
return resolve_module_placeholder(self.label, get_module_bay_positions(module.module_bay))
|
||||
return self._resolve_module_placeholder(self.label, module)
|
||||
|
||||
|
||||
class ConsolePortTemplate(ModularComponentTemplateModel):
|
||||
@@ -705,9 +720,7 @@ class ModuleBayTemplate(ModularComponentTemplateModel):
|
||||
verbose_name_plural = _('module bay templates')
|
||||
|
||||
def resolve_position(self, module):
|
||||
if MODULE_TOKEN not in self.position or not module:
|
||||
return self.position
|
||||
return resolve_module_placeholder(self.position, get_module_bay_positions(module.module_bay))
|
||||
return self._resolve_module_placeholder(self.position, module)
|
||||
|
||||
def instantiate(self, **kwargs):
|
||||
return self.component_model(
|
||||
|
||||
@@ -2,7 +2,7 @@ from functools import cached_property
|
||||
|
||||
from django.contrib.contenttypes.fields import GenericForeignKey, GenericRelation
|
||||
from django.contrib.postgres.fields import ArrayField
|
||||
from django.core.exceptions import ValidationError
|
||||
from django.core.exceptions import ObjectDoesNotExist, ValidationError
|
||||
from django.core.validators import MaxValueValidator, MinValueValidator
|
||||
from django.db import models
|
||||
from django.db.models import Sum
|
||||
@@ -298,20 +298,24 @@ class CabledObjectModel(models.Model):
|
||||
|
||||
class PathEndpoint(models.Model):
|
||||
"""
|
||||
An abstract model inherited by any CabledObjectModel subclass which represents the end of a CablePath; specifically,
|
||||
these include ConsolePort, ConsoleServerPort, PowerPort, PowerOutlet, Interface, and PowerFeed.
|
||||
An abstract model inherited by any CabledObjectModel subclass which
|
||||
represents the end of a CablePath; specifically, these include
|
||||
ConsolePort, ConsoleServerPort, PowerPort, PowerOutlet, Interface, and
|
||||
PowerFeed.
|
||||
|
||||
`_path` references the CablePath originating from this instance, if any. It is set or cleared by the receivers in
|
||||
dcim.signals in response to changes in the cable path, and complements the `origin` GenericForeignKey field on the
|
||||
CablePath model. `_path` should not be accessed directly; rather, use the `path` property.
|
||||
|
||||
`connected_endpoints()` is a convenience method for returning the destination of the associated CablePath, if any.
|
||||
`_path` references the CablePath originating from this instance, if any.
|
||||
It is set or cleared by the receivers in dcim.signals in response to
|
||||
changes in the cable path, and complements the `origin` GenericForeignKey
|
||||
field on the CablePath model. `_path` should not be accessed directly;
|
||||
rather, use the `path` property. `connected_endpoints()` is a convenience
|
||||
method for returning the destination of the associated CablePath, if any.
|
||||
"""
|
||||
|
||||
_path = models.ForeignKey(
|
||||
to='dcim.CablePath',
|
||||
on_delete=models.SET_NULL,
|
||||
null=True,
|
||||
blank=True
|
||||
blank=True,
|
||||
)
|
||||
|
||||
class Meta:
|
||||
@@ -323,39 +327,74 @@ class PathEndpoint(models.Model):
|
||||
|
||||
# Construct the complete path (including e.g. bridged interfaces)
|
||||
while origin is not None:
|
||||
|
||||
if origin._path is None:
|
||||
# Go through the public accessor rather than dereferencing `_path`
|
||||
# directly. During cable edits, CablePath rows can be deleted and
|
||||
# recreated while this endpoint instance is still in memory.
|
||||
cable_path = origin.path
|
||||
if cable_path is None:
|
||||
break
|
||||
|
||||
path.extend(origin._path.path_objects)
|
||||
path.extend(cable_path.path_objects)
|
||||
|
||||
# If the path ends at a non-connected pass-through port, pad out the link and far-end terminations
|
||||
# If the path ends at a non-connected pass-through port, pad out
|
||||
# the link and far-end terminations.
|
||||
if len(path) % 3 == 1:
|
||||
path.extend(([], []))
|
||||
# If the path ends at a site or provider network, inject a null "link" to render an attachment
|
||||
|
||||
# If the path ends at a site or provider network, inject a null
|
||||
# "link" to render an attachment.
|
||||
elif len(path) % 3 == 2:
|
||||
path.insert(-1, [])
|
||||
|
||||
# Check for a bridged relationship to continue the trace
|
||||
destinations = origin._path.destinations
|
||||
# Check for a bridged relationship to continue the trace.
|
||||
destinations = cable_path.destinations
|
||||
if len(destinations) == 1:
|
||||
origin = getattr(destinations[0], 'bridge', None)
|
||||
else:
|
||||
origin = None
|
||||
|
||||
# Return the path as a list of three-tuples (A termination(s), cable(s), B termination(s))
|
||||
# Return the path as a list of three-tuples
|
||||
# (A termination(s), cable(s), B termination(s))
|
||||
return list(zip(*[iter(path)] * 3))
|
||||
|
||||
@property
|
||||
def path(self):
|
||||
return self._path
|
||||
"""
|
||||
Return this endpoint's current CablePath, if any.
|
||||
|
||||
`_path` is a denormalized reference that is updated from CablePath
|
||||
save/delete handlers, including queryset.update() calls on origin
|
||||
endpoints. That means an already-instantiated endpoint can briefly hold
|
||||
a stale in-memory `_path` relation while the database already points to
|
||||
a different CablePath (or to no path at all).
|
||||
|
||||
If the cached relation points to a CablePath that has just been
|
||||
deleted, refresh only the `_path` field from the database and retry.
|
||||
This keeps the fix cheap and narrowly scoped to the denormalized FK.
|
||||
"""
|
||||
if self._path_id is None:
|
||||
return None
|
||||
|
||||
try:
|
||||
return self._path
|
||||
except ObjectDoesNotExist:
|
||||
# Refresh only the denormalized FK instead of the whole model.
|
||||
# The expected problem here is in-memory staleness during path
|
||||
# rebuilds, not persistent database corruption.
|
||||
self.refresh_from_db(fields=['_path'])
|
||||
return self._path if self._path_id else None
|
||||
|
||||
@cached_property
|
||||
def connected_endpoints(self):
|
||||
"""
|
||||
Caching accessor for the attached CablePath's destination (if any)
|
||||
Caching accessor for the attached CablePath's destinations (if any).
|
||||
|
||||
Always route through `path` so stale in-memory `_path` references are
|
||||
repaired before we cache the result for the lifetime of this instance.
|
||||
"""
|
||||
return self._path.destinations if self._path else []
|
||||
if cable_path := self.path:
|
||||
return cable_path.destinations
|
||||
return []
|
||||
|
||||
|
||||
#
|
||||
|
||||
@@ -5,6 +5,7 @@ from circuits.models import *
|
||||
from core.models import ObjectType
|
||||
from dcim.choices import *
|
||||
from dcim.models import *
|
||||
from extras.events import serialize_for_event
|
||||
from extras.models import CustomField
|
||||
from ipam.models import Prefix
|
||||
from netbox.choices import WeightUnitChoices
|
||||
@@ -893,77 +894,6 @@ class ModuleBayTestCase(TestCase):
|
||||
nested_bay = module.modulebays.get(name='Sub-bay 1-1')
|
||||
self.assertEqual(nested_bay.position, '1-1')
|
||||
|
||||
@tag('regression') # #20474
|
||||
def test_single_module_token_at_nested_depth(self):
|
||||
"""
|
||||
A module type with a single {module} token should install at depth > 1
|
||||
without raising a token count mismatch error, resolving to the immediate
|
||||
parent bay's position.
|
||||
"""
|
||||
manufacturer = Manufacturer.objects.first()
|
||||
site = Site.objects.first()
|
||||
device_role = DeviceRole.objects.first()
|
||||
|
||||
device_type = DeviceType.objects.create(
|
||||
manufacturer=manufacturer,
|
||||
model='Chassis with Rear Card',
|
||||
slug='chassis-with-rear-card'
|
||||
)
|
||||
ModuleBayTemplate.objects.create(
|
||||
device_type=device_type,
|
||||
name='Rear card slot',
|
||||
position='1'
|
||||
)
|
||||
|
||||
rear_card_type = ModuleType.objects.create(
|
||||
manufacturer=manufacturer,
|
||||
model='Rear Card'
|
||||
)
|
||||
ModuleBayTemplate.objects.create(
|
||||
module_type=rear_card_type,
|
||||
name='SFP slot 1',
|
||||
position='1'
|
||||
)
|
||||
ModuleBayTemplate.objects.create(
|
||||
module_type=rear_card_type,
|
||||
name='SFP slot 2',
|
||||
position='2'
|
||||
)
|
||||
|
||||
sfp_type = ModuleType.objects.create(
|
||||
manufacturer=manufacturer,
|
||||
model='SFP Module'
|
||||
)
|
||||
InterfaceTemplate.objects.create(
|
||||
module_type=sfp_type,
|
||||
name='SFP {module}',
|
||||
type=InterfaceTypeChoices.TYPE_10GE_SFP_PLUS
|
||||
)
|
||||
|
||||
device = Device.objects.create(
|
||||
name='Test Chassis',
|
||||
device_type=device_type,
|
||||
role=device_role,
|
||||
site=site
|
||||
)
|
||||
|
||||
rear_card_bay = device.modulebays.get(name='Rear card slot')
|
||||
rear_card = Module.objects.create(
|
||||
device=device,
|
||||
module_bay=rear_card_bay,
|
||||
module_type=rear_card_type
|
||||
)
|
||||
|
||||
sfp_bay = rear_card.modulebays.get(name='SFP slot 2')
|
||||
sfp_module = Module.objects.create(
|
||||
device=device,
|
||||
module_bay=sfp_bay,
|
||||
module_type=sfp_type
|
||||
)
|
||||
|
||||
interface = sfp_module.interfaces.first()
|
||||
self.assertEqual(interface.name, 'SFP 2')
|
||||
|
||||
@tag('regression') # #20912
|
||||
def test_module_bay_parent_cleared_when_module_removed(self):
|
||||
"""Test that the parent field is properly cleared when a module bay's module assignment is removed"""
|
||||
@@ -1345,6 +1275,65 @@ class CableTestCase(TestCase):
|
||||
self.assertEqual(a_terms, [interface1])
|
||||
self.assertEqual(b_terms, [interface2])
|
||||
|
||||
@tag('regression') # #21498
|
||||
def test_path_refreshes_replaced_cablepath_reference(self):
|
||||
"""
|
||||
An already-instantiated interface should refresh its denormalized
|
||||
`_path` foreign key when the referenced CablePath row has been
|
||||
replaced in the database.
|
||||
"""
|
||||
stale_interface = Interface.objects.get(device__name='TestDevice1', name='eth0')
|
||||
old_path = CablePath.objects.get(pk=stale_interface._path_id)
|
||||
|
||||
new_path = CablePath(
|
||||
path=old_path.path,
|
||||
is_active=old_path.is_active,
|
||||
is_complete=old_path.is_complete,
|
||||
is_split=old_path.is_split,
|
||||
)
|
||||
old_path_id = old_path.pk
|
||||
old_path.delete()
|
||||
new_path.save()
|
||||
|
||||
# The old CablePath no longer exists
|
||||
self.assertFalse(CablePath.objects.filter(pk=old_path_id).exists())
|
||||
|
||||
# The already-instantiated interface still points to the deleted path
|
||||
# until the accessor refreshes `_path` from the database.
|
||||
self.assertEqual(stale_interface._path_id, old_path_id)
|
||||
self.assertEqual(stale_interface.path.pk, new_path.pk)
|
||||
|
||||
@tag('regression') # #21498
|
||||
def test_serialize_for_event_handles_stale_cablepath_reference_after_retermination(self):
|
||||
"""
|
||||
Serializing an interface whose previously cached `_path` row has been
|
||||
deleted during cable retermination must not raise.
|
||||
"""
|
||||
stale_interface = Interface.objects.get(device__name='TestDevice2', name='eth0')
|
||||
old_path_id = stale_interface._path_id
|
||||
new_peer = Interface.objects.get(device__name='TestDevice2', name='eth1')
|
||||
cable = stale_interface.cable
|
||||
|
||||
self.assertIsNotNone(cable)
|
||||
self.assertIsNotNone(old_path_id)
|
||||
self.assertEqual(stale_interface.cable_end, 'B')
|
||||
|
||||
cable.b_terminations = [new_peer]
|
||||
cable.save()
|
||||
|
||||
# The old CablePath was deleted during retrace.
|
||||
self.assertFalse(CablePath.objects.filter(pk=old_path_id).exists())
|
||||
|
||||
# The stale in-memory instance still holds the deleted FK value.
|
||||
self.assertEqual(stale_interface._path_id, old_path_id)
|
||||
|
||||
# Serialization must not raise ObjectDoesNotExist. Because this interface
|
||||
# was the former B-side termination, it is now disconnected.
|
||||
data = serialize_for_event(stale_interface)
|
||||
self.assertIsNone(data['connected_endpoints'])
|
||||
self.assertIsNone(data['connected_endpoints_type'])
|
||||
self.assertFalse(data['connected_endpoints_reachable'])
|
||||
|
||||
|
||||
class VirtualDeviceContextTestCase(TestCase):
|
||||
|
||||
|
||||
@@ -420,23 +420,14 @@ class VirtualChassisMembersPanel(panels.ObjectPanel):
|
||||
"""
|
||||
A panel which lists all members of a virtual chassis.
|
||||
"""
|
||||
|
||||
template_name = 'dcim/panels/virtual_chassis_members.html'
|
||||
title = _('Virtual Chassis Members')
|
||||
actions = [
|
||||
actions.AddObject(
|
||||
'dcim.device',
|
||||
url_params={
|
||||
'site': lambda ctx: (
|
||||
ctx['virtual_chassis'].master.site_id
|
||||
if ctx['virtual_chassis'] and ctx['virtual_chassis'].master_id
|
||||
else ''
|
||||
),
|
||||
'rack': lambda ctx: (
|
||||
ctx['virtual_chassis'].master.rack_id
|
||||
if ctx['virtual_chassis'] and ctx['virtual_chassis'].master_id
|
||||
else ''
|
||||
),
|
||||
'site': lambda ctx: ctx['object'].master.site_id if ctx['object'].master else '',
|
||||
'rack': lambda ctx: ctx['object'].master.rack_id if ctx['object'].master else '',
|
||||
},
|
||||
),
|
||||
]
|
||||
@@ -541,7 +532,7 @@ class VirtualCircuitPanel(panels.ObjectPanel):
|
||||
|
||||
def render(self, context):
|
||||
obj = context.get('object')
|
||||
if not obj or not obj.is_virtual or not hasattr(obj, 'virtual_circuit_termination'):
|
||||
if not obj or not obj.is_virtual or not obj.virtual_circuit_termination:
|
||||
return ''
|
||||
ctx = self.get_context(context)
|
||||
return render_to_string(self.template_name, ctx, request=ctx.get('request'))
|
||||
|
||||
@@ -3,9 +3,6 @@ from collections import defaultdict
|
||||
from django.apps import apps
|
||||
from django.contrib.contenttypes.models import ContentType
|
||||
from django.db import router, transaction
|
||||
from django.utils.translation import gettext as _
|
||||
|
||||
from dcim.constants import MODULE_TOKEN
|
||||
|
||||
|
||||
def compile_path_node(ct_id, object_id):
|
||||
@@ -36,51 +33,6 @@ def path_node_to_object(repr):
|
||||
return ct.model_class().objects.filter(pk=object_id).first()
|
||||
|
||||
|
||||
def get_module_bay_positions(module_bay):
|
||||
"""
|
||||
Given a module bay, traverse up the module hierarchy and return
|
||||
a list of bay position strings from root to leaf.
|
||||
"""
|
||||
positions = []
|
||||
while module_bay:
|
||||
positions.append(module_bay.position)
|
||||
if module_bay.module:
|
||||
module_bay = module_bay.module.module_bay
|
||||
else:
|
||||
module_bay = None
|
||||
positions.reverse()
|
||||
return positions
|
||||
|
||||
|
||||
def resolve_module_placeholder(value, positions):
|
||||
"""
|
||||
Resolve {module} placeholder tokens in a string using the given
|
||||
list of module bay positions (ordered root to leaf).
|
||||
|
||||
A single {module} token resolves to the leaf (immediate parent) bay's position.
|
||||
Multiple tokens must match the tree depth and resolve level-by-level.
|
||||
|
||||
Returns the resolved string.
|
||||
Raises ValueError if token count is greater than 1 and doesn't match tree depth.
|
||||
"""
|
||||
if MODULE_TOKEN not in value:
|
||||
return value
|
||||
|
||||
token_count = value.count(MODULE_TOKEN)
|
||||
if token_count == 1:
|
||||
return value.replace(MODULE_TOKEN, positions[-1])
|
||||
if token_count == len(positions):
|
||||
for pos in positions:
|
||||
value = value.replace(MODULE_TOKEN, pos, 1)
|
||||
return value
|
||||
raise ValueError(
|
||||
_("Cannot install module with placeholder values in a module bay tree "
|
||||
"{level} levels deep but {tokens} placeholders given.").format(
|
||||
level=len(positions), tokens=token_count
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def create_cablepaths(objects):
|
||||
"""
|
||||
Create CablePaths for all paths originating from the specified set of nodes.
|
||||
|
||||
@@ -2,7 +2,7 @@ from django.utils.translation import gettext as _
|
||||
from drf_spectacular.types import OpenApiTypes
|
||||
from drf_spectacular.utils import extend_schema_field
|
||||
from rest_framework.fields import Field
|
||||
from rest_framework.serializers import ListSerializer, ValidationError
|
||||
from rest_framework.serializers import ValidationError
|
||||
|
||||
from extras.choices import CustomFieldTypeChoices
|
||||
from extras.constants import CUSTOMFIELD_EMPTY_VALUES
|
||||
@@ -49,25 +49,8 @@ class CustomFieldsDataField(Field):
|
||||
# TODO: Fix circular import
|
||||
from utilities.api import get_serializer_for_model
|
||||
data = {}
|
||||
cache = self.parent.context.get('cf_object_cache')
|
||||
|
||||
for cf in self._get_custom_fields():
|
||||
if cache is not None and cf.type in (
|
||||
CustomFieldTypeChoices.TYPE_OBJECT,
|
||||
CustomFieldTypeChoices.TYPE_MULTIOBJECT,
|
||||
):
|
||||
raw = obj.get(cf.name)
|
||||
if raw is None:
|
||||
value = None
|
||||
elif cf.type == CustomFieldTypeChoices.TYPE_OBJECT:
|
||||
model = cf.related_object_type.model_class()
|
||||
value = cache.get((model, raw))
|
||||
else:
|
||||
model = cf.related_object_type.model_class()
|
||||
value = [cache[(model, pk)] for pk in raw if (model, pk) in cache] or None
|
||||
else:
|
||||
value = cf.deserialize(obj.get(cf.name))
|
||||
|
||||
value = cf.deserialize(obj.get(cf.name))
|
||||
if value is not None and cf.type == CustomFieldTypeChoices.TYPE_OBJECT:
|
||||
serializer = get_serializer_for_model(cf.related_object_type.model_class())
|
||||
value = serializer(value, nested=True, context=self.parent.context).data
|
||||
@@ -104,32 +87,3 @@ class CustomFieldsDataField(Field):
|
||||
data = {**self.parent.instance.custom_field_data, **data}
|
||||
|
||||
return data
|
||||
|
||||
|
||||
class CustomFieldListSerializer(ListSerializer):
|
||||
"""
|
||||
ListSerializer that pre-fetches all OBJECT/MULTIOBJECT custom field related objects
|
||||
in bulk before per-item serialization.
|
||||
"""
|
||||
def to_representation(self, data):
|
||||
cf_field = self.child.fields.get('custom_fields')
|
||||
if isinstance(cf_field, CustomFieldsDataField):
|
||||
object_type_cfs = [
|
||||
cf for cf in cf_field._get_custom_fields()
|
||||
if cf.type in (CustomFieldTypeChoices.TYPE_OBJECT, CustomFieldTypeChoices.TYPE_MULTIOBJECT)
|
||||
]
|
||||
cache = {}
|
||||
for cf in object_type_cfs:
|
||||
model = cf.related_object_type.model_class()
|
||||
pks = set()
|
||||
for item in data:
|
||||
raw = item.custom_field_data.get(cf.name)
|
||||
if raw is not None:
|
||||
if cf.type == CustomFieldTypeChoices.TYPE_MULTIOBJECT:
|
||||
pks.update(raw)
|
||||
else:
|
||||
pks.add(raw)
|
||||
for obj in model.objects.filter(pk__in=pks):
|
||||
cache[(model, obj.pk)] = obj
|
||||
self.child.context['cf_object_cache'] = cache
|
||||
return super().to_representation(data)
|
||||
|
||||
@@ -1,70 +1,19 @@
|
||||
import logging
|
||||
|
||||
from django.core.files.storage import storages
|
||||
from django.db import IntegrityError
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
from django.utils.translation import gettext as _
|
||||
from drf_spectacular.utils import extend_schema_field
|
||||
from rest_framework import serializers
|
||||
|
||||
from core.api.serializers_.jobs import JobSerializer
|
||||
from core.choices import ManagedFileRootPathChoices
|
||||
from extras.models import Script, ScriptModule
|
||||
from extras.models import Script
|
||||
from netbox.api.serializers import ValidatedModelSerializer
|
||||
from utilities.datetime import local_now
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
__all__ = (
|
||||
'ScriptDetailSerializer',
|
||||
'ScriptInputSerializer',
|
||||
'ScriptModuleSerializer',
|
||||
'ScriptSerializer',
|
||||
)
|
||||
|
||||
|
||||
class ScriptModuleSerializer(ValidatedModelSerializer):
|
||||
file = serializers.FileField(write_only=True)
|
||||
file_path = serializers.CharField(read_only=True)
|
||||
|
||||
class Meta:
|
||||
model = ScriptModule
|
||||
fields = ['id', 'display', 'file_path', 'file', 'created', 'last_updated']
|
||||
brief_fields = ('id', 'display')
|
||||
|
||||
def validate(self, data):
|
||||
# ScriptModule.save() sets file_root; inject it here so full_clean() succeeds.
|
||||
# Pop 'file' before model instantiation — ScriptModule has no such field.
|
||||
file = data.pop('file', None)
|
||||
data['file_root'] = ManagedFileRootPathChoices.SCRIPTS
|
||||
data = super().validate(data)
|
||||
data.pop('file_root', None)
|
||||
if file is not None:
|
||||
data['file'] = file
|
||||
return data
|
||||
|
||||
def create(self, validated_data):
|
||||
file = validated_data.pop('file')
|
||||
storage = storages.create_storage(storages.backends["scripts"])
|
||||
validated_data['file_path'] = storage.save(file.name, file)
|
||||
created = False
|
||||
try:
|
||||
instance = super().create(validated_data)
|
||||
created = True
|
||||
return instance
|
||||
except IntegrityError as e:
|
||||
if 'file_path' in str(e):
|
||||
raise serializers.ValidationError(
|
||||
_("A script module with this file name already exists.")
|
||||
)
|
||||
raise
|
||||
finally:
|
||||
if not created and (file_path := validated_data.get('file_path')):
|
||||
try:
|
||||
storage.delete(file_path)
|
||||
except Exception:
|
||||
logger.warning(f"Failed to delete orphaned script file '{file_path}' from storage.")
|
||||
|
||||
|
||||
class ScriptSerializer(ValidatedModelSerializer):
|
||||
description = serializers.SerializerMethodField(read_only=True)
|
||||
vars = serializers.SerializerMethodField(read_only=True)
|
||||
|
||||
@@ -26,7 +26,6 @@ router.register('journal-entries', views.JournalEntryViewSet)
|
||||
router.register('config-contexts', views.ConfigContextViewSet)
|
||||
router.register('config-context-profiles', views.ConfigContextProfileViewSet)
|
||||
router.register('config-templates', views.ConfigTemplateViewSet)
|
||||
router.register('scripts/upload', views.ScriptModuleViewSet)
|
||||
router.register('scripts', views.ScriptViewSet, basename='script')
|
||||
|
||||
app_name = 'extras-api'
|
||||
|
||||
@@ -6,7 +6,7 @@ from rest_framework import status
|
||||
from rest_framework.decorators import action
|
||||
from rest_framework.exceptions import PermissionDenied
|
||||
from rest_framework.generics import RetrieveUpdateDestroyAPIView
|
||||
from rest_framework.mixins import CreateModelMixin, ListModelMixin, RetrieveModelMixin
|
||||
from rest_framework.mixins import ListModelMixin, RetrieveModelMixin
|
||||
from rest_framework.renderers import JSONRenderer
|
||||
from rest_framework.response import Response
|
||||
from rest_framework.routers import APIRootView
|
||||
@@ -21,7 +21,6 @@ from netbox.api.features import SyncedDataMixin
|
||||
from netbox.api.metadata import ContentTypeMetadata
|
||||
from netbox.api.renderers import TextRenderer
|
||||
from netbox.api.viewsets import BaseViewSet, NetBoxModelViewSet
|
||||
from netbox.api.viewsets.mixins import ObjectValidationMixin
|
||||
from utilities.exceptions import RQWorkerNotRunningException
|
||||
from utilities.request import copy_safe_request
|
||||
|
||||
@@ -265,11 +264,6 @@ class ConfigTemplateViewSet(SyncedDataMixin, ConfigTemplateRenderMixin, NetBoxMo
|
||||
# Scripts
|
||||
#
|
||||
|
||||
class ScriptModuleViewSet(ObjectValidationMixin, CreateModelMixin, BaseViewSet):
|
||||
queryset = ScriptModule.objects.all()
|
||||
serializer_class = serializers.ScriptModuleSerializer
|
||||
|
||||
|
||||
@extend_schema_view(
|
||||
update=extend_schema(request=serializers.ScriptInputSerializer),
|
||||
partial_update=extend_schema(request=serializers.ScriptInputSerializer),
|
||||
|
||||
@@ -25,22 +25,59 @@ logger = logging.getLogger('netbox.events_processor')
|
||||
|
||||
class EventContext(UserDict):
|
||||
"""
|
||||
A custom dictionary that automatically serializes its associated object on demand.
|
||||
Dictionary-compatible wrapper for queued events that lazily serializes
|
||||
``event['data']`` on first access.
|
||||
|
||||
Backward-compatible with the plain-dict interface expected by existing
|
||||
EVENTS_PIPELINE consumers. When the same object is enqueued more than once
|
||||
in a single request, the serialization source is updated so consumers see
|
||||
the latest state.
|
||||
"""
|
||||
|
||||
# We're emulating a dictionary here (rather than using a custom class) because prior to NetBox v4.5.2, events were
|
||||
# queued as dictionaries for processing by handles in EVENTS_PIPELINE. We need to avoid introducing any breaking
|
||||
# changes until a suitable minor release.
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
# Track which model instance should be serialized if/when `data` is
|
||||
# requested. This may be refreshed on duplicate enqueue, while leaving
|
||||
# the public `object` entry untouched for compatibility.
|
||||
self._serialization_source = self.get('object')
|
||||
|
||||
def refresh_serialization_source(self, instance):
|
||||
"""
|
||||
Point lazy serialization at a fresher instance, invalidating any
|
||||
already-materialized ``data``.
|
||||
"""
|
||||
self._serialization_source = instance
|
||||
# UserDict.__contains__ checks the backing dict directly, so `in`
|
||||
# does not trigger __getitem__'s lazy serialization.
|
||||
if 'data' in self:
|
||||
del self['data']
|
||||
|
||||
def freeze_data(self, instance):
|
||||
"""
|
||||
Eagerly serialize and cache the payload for delete events, where the
|
||||
object may become inaccessible after deletion.
|
||||
"""
|
||||
super().__setitem__('data', serialize_for_event(instance))
|
||||
self._serialization_source = None
|
||||
|
||||
def __getitem__(self, item):
|
||||
if item == 'data' and 'data' not in self:
|
||||
data = serialize_for_event(self['object'])
|
||||
self.__setitem__('data', data)
|
||||
# Materialize the payload only when an event consumer asks for it.
|
||||
#
|
||||
# On coalesced events, use the latest explicitly queued instance so
|
||||
# webhooks/scripts/notifications observe the final queued state for
|
||||
# that object within the request.
|
||||
source = self._serialization_source or super().__getitem__('object')
|
||||
super().__setitem__('data', serialize_for_event(source))
|
||||
|
||||
return super().__getitem__(item)
|
||||
|
||||
|
||||
def serialize_for_event(instance):
|
||||
"""
|
||||
Return a serialized representation of the given instance suitable for use in a queued event.
|
||||
Return a serialized representation of the given instance suitable for use
|
||||
in a queued event.
|
||||
"""
|
||||
serializer_class = get_serializer_for_model(instance.__class__)
|
||||
serializer_context = {
|
||||
@@ -53,7 +90,8 @@ def serialize_for_event(instance):
|
||||
|
||||
def get_snapshots(instance, event_type):
|
||||
"""
|
||||
Return a dictionary of pre- and post-change snapshots for the given instance.
|
||||
Return a dictionary of pre- and post-change snapshots for the given
|
||||
instance.
|
||||
"""
|
||||
if event_type == OBJECT_DELETED:
|
||||
# Post-change snapshot must be empty for deleted objects
|
||||
@@ -76,8 +114,9 @@ def get_snapshots(instance, event_type):
|
||||
|
||||
def enqueue_event(queue, instance, request, event_type):
|
||||
"""
|
||||
Enqueue a serialized representation of a created/updated/deleted object for the processing of
|
||||
events once the request has completed.
|
||||
Enqueue (or coalesce) an event for a created/updated/deleted object.
|
||||
|
||||
Events are processed after the request completes.
|
||||
"""
|
||||
# Bail if this type of object does not support event rules
|
||||
if not has_feature(instance, 'event_rules'):
|
||||
@@ -88,11 +127,18 @@ def enqueue_event(queue, instance, request, event_type):
|
||||
|
||||
assert instance.pk is not None
|
||||
key = f'{app_label}.{model_name}:{instance.pk}'
|
||||
|
||||
if key in queue:
|
||||
queue[key]['snapshots']['postchange'] = get_snapshots(instance, event_type)['postchange']
|
||||
# If the object is being deleted, update any prior "update" event to "delete"
|
||||
|
||||
# If the object is being deleted, convert any prior update event into a
|
||||
# delete event and freeze the payload before the object (or related
|
||||
# rows) become inaccessible.
|
||||
if event_type == OBJECT_DELETED:
|
||||
queue[key]['event_type'] = event_type
|
||||
else:
|
||||
# Keep the public `object` entry stable for compatibility.
|
||||
queue[key].refresh_serialization_source(instance)
|
||||
else:
|
||||
queue[key] = EventContext(
|
||||
object_type=ObjectType.objects.get_for_model(instance),
|
||||
@@ -106,14 +152,16 @@ def enqueue_event(queue, instance, request, event_type):
|
||||
username=request.user.username, # DEPRECATED, will be removed in NetBox v4.7.0
|
||||
request_id=request.id, # DEPRECATED, will be removed in NetBox v4.7.0
|
||||
)
|
||||
# Force serialization of objects prior to them actually being deleted
|
||||
|
||||
# For delete events, eagerly serialize the payload before the row is gone.
|
||||
# This covers both first-time enqueues and coalesced update→delete promotions.
|
||||
if event_type == OBJECT_DELETED:
|
||||
queue[key]['data'] = serialize_for_event(instance)
|
||||
queue[key].freeze_data(instance)
|
||||
|
||||
|
||||
def process_event_rules(event_rules, object_type, event):
|
||||
"""
|
||||
Process a list of EventRules against an event.
|
||||
Evaluate and dispatch a list of EventRules against an event.
|
||||
|
||||
Notes on event sources:
|
||||
- Object change events (created/updated/deleted) are enqueued via
|
||||
@@ -133,9 +181,9 @@ def process_event_rules(event_rules, object_type, event):
|
||||
if not event_rule.eval_conditions(event['data']):
|
||||
continue
|
||||
|
||||
# Compile event data
|
||||
event_data = event_rule.action_data or {}
|
||||
event_data.update(event['data'])
|
||||
# Merge rule-specific action_data with the event payload.
|
||||
# Copy to avoid mutating the rule's stored action_data dict.
|
||||
event_data = {**(event_rule.action_data or {}), **event['data']}
|
||||
|
||||
# Webhooks
|
||||
if event_rule.action_type == EventRuleActionChoices.WEBHOOK:
|
||||
|
||||
@@ -74,7 +74,7 @@ class CustomFieldManager(models.Manager.from_queryset(RestrictedQuerySet)):
|
||||
return custom_fields
|
||||
|
||||
content_type = ObjectType.objects.get_for_model(model._meta.concrete_model)
|
||||
custom_fields = self.get_queryset().filter(object_types=content_type).select_related('related_object_type')
|
||||
custom_fields = self.get_queryset().filter(object_types=content_type)
|
||||
|
||||
# Populate the request cache to avoid redundant lookups
|
||||
if cache is not None:
|
||||
|
||||
@@ -1,9 +1,7 @@
|
||||
import datetime
|
||||
import hashlib
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
from django.contrib.contenttypes.models import ContentType
|
||||
from django.core.files.uploadedfile import SimpleUploadedFile
|
||||
from django.urls import reverse
|
||||
from django.utils.timezone import make_aware, now
|
||||
from rest_framework import status
|
||||
@@ -1386,54 +1384,3 @@ class NotificationTest(APIViewTestCases.APIViewTestCase):
|
||||
'event_type': OBJECT_DELETED,
|
||||
},
|
||||
]
|
||||
|
||||
|
||||
class ScriptModuleTest(APITestCase):
|
||||
"""
|
||||
Tests for the POST /api/extras/scripts/upload/ endpoint.
|
||||
|
||||
ScriptModule is a proxy of core.ManagedFile (a different app) so the standard
|
||||
APIViewTestCases mixins cannot be used directly. All tests use add_permissions()
|
||||
with explicit Django model-level permissions.
|
||||
"""
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.url = reverse('extras-api:scriptmodule-list') # /api/extras/scripts/upload/
|
||||
|
||||
def test_upload_script_module_without_permission(self):
|
||||
script_content = b"from extras.scripts import Script\nclass TestScript(Script):\n pass\n"
|
||||
upload_file = SimpleUploadedFile('test_upload.py', script_content, content_type='text/plain')
|
||||
response = self.client.post(
|
||||
self.url,
|
||||
{'file': upload_file},
|
||||
format='multipart',
|
||||
**self.header,
|
||||
)
|
||||
self.assertHttpStatus(response, status.HTTP_403_FORBIDDEN)
|
||||
|
||||
def test_upload_script_module(self):
|
||||
# ScriptModule is a proxy of core.ManagedFile; both permissions required.
|
||||
self.add_permissions('extras.add_scriptmodule', 'core.add_managedfile')
|
||||
script_content = b"from extras.scripts import Script\nclass TestScript(Script):\n pass\n"
|
||||
upload_file = SimpleUploadedFile('test_upload.py', script_content, content_type='text/plain')
|
||||
mock_storage = MagicMock()
|
||||
mock_storage.save.return_value = 'test_upload.py'
|
||||
with patch('extras.api.serializers_.scripts.storages') as mock_storages:
|
||||
mock_storages.create_storage.return_value = mock_storage
|
||||
mock_storages.backends = {'scripts': {}}
|
||||
response = self.client.post(
|
||||
self.url,
|
||||
{'file': upload_file},
|
||||
format='multipart',
|
||||
**self.header,
|
||||
)
|
||||
self.assertHttpStatus(response, status.HTTP_201_CREATED)
|
||||
self.assertEqual(response.data['file_path'], 'test_upload.py')
|
||||
mock_storage.save.assert_called_once()
|
||||
self.assertTrue(ScriptModule.objects.filter(file_path='test_upload.py').exists())
|
||||
|
||||
def test_upload_script_module_without_file_fails(self):
|
||||
self.add_permissions('extras.add_scriptmodule', 'core.add_managedfile')
|
||||
response = self.client.post(self.url, {}, format='json', **self.header)
|
||||
self.assertHttpStatus(response, status.HTTP_400_BAD_REQUEST)
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
import json
|
||||
import uuid
|
||||
from unittest import skipIf
|
||||
from unittest.mock import Mock, patch
|
||||
|
||||
import django_rq
|
||||
from django.conf import settings
|
||||
from django.http import HttpResponse
|
||||
from django.test import RequestFactory
|
||||
from django.urls import reverse
|
||||
@@ -343,6 +345,7 @@ class EventRuleTest(APITestCase):
|
||||
self.assertEqual(job.kwargs['snapshots']['prechange']['name'], sites[i].name)
|
||||
self.assertEqual(job.kwargs['snapshots']['prechange']['tags'], ['Bar', 'Foo'])
|
||||
|
||||
@skipIf('netbox.tests.dummy_plugin' not in settings.PLUGINS, 'dummy_plugin not in settings.PLUGINS')
|
||||
def test_send_webhook(self):
|
||||
request_id = uuid.uuid4()
|
||||
|
||||
@@ -426,6 +429,97 @@ class EventRuleTest(APITestCase):
|
||||
self.assertEqual(job.kwargs['object_type'], script_type)
|
||||
self.assertEqual(job.kwargs['username'], self.user.username)
|
||||
|
||||
def test_duplicate_enqueue_refreshes_lazy_payload(self):
|
||||
"""
|
||||
When the same object is enqueued more than once in a single request,
|
||||
lazy serialization should use the most recently enqueued instance while
|
||||
preserving the original event['object'] reference.
|
||||
"""
|
||||
request = RequestFactory().get(reverse('dcim:site_add'))
|
||||
request.id = uuid.uuid4()
|
||||
request.user = self.user
|
||||
|
||||
site = Site.objects.create(name='Site 1', slug='site-1')
|
||||
stale_site = Site.objects.get(pk=site.pk)
|
||||
|
||||
queue = {}
|
||||
enqueue_event(queue, stale_site, request, OBJECT_UPDATED)
|
||||
|
||||
event = queue[f'dcim.site:{site.pk}']
|
||||
|
||||
# Data should not be materialized yet (lazy serialization)
|
||||
self.assertNotIn('data', event.data)
|
||||
|
||||
fresh_site = Site.objects.get(pk=site.pk)
|
||||
fresh_site.description = 'foo'
|
||||
fresh_site.save()
|
||||
|
||||
enqueue_event(queue, fresh_site, request, OBJECT_UPDATED)
|
||||
|
||||
# The original object reference should be preserved
|
||||
self.assertIs(event['object'], stale_site)
|
||||
|
||||
# But serialized data should reflect the fresher instance
|
||||
self.assertEqual(event['data']['description'], 'foo')
|
||||
self.assertEqual(event['snapshots']['postchange']['description'], 'foo')
|
||||
|
||||
def test_duplicate_enqueue_invalidates_materialized_data(self):
|
||||
"""
|
||||
If event['data'] has already been materialized before a second enqueue
|
||||
for the same object, the stale payload should be discarded and rebuilt
|
||||
from the fresher instance on next access.
|
||||
"""
|
||||
request = RequestFactory().get(reverse('dcim:site_add'))
|
||||
request.id = uuid.uuid4()
|
||||
request.user = self.user
|
||||
|
||||
site = Site.objects.create(name='Site 1', slug='site-1')
|
||||
|
||||
queue = {}
|
||||
enqueue_event(queue, site, request, OBJECT_UPDATED)
|
||||
|
||||
event = queue[f'dcim.site:{site.pk}']
|
||||
|
||||
# Force early materialization
|
||||
self.assertEqual(event['data']['description'], '')
|
||||
|
||||
# Now update and re-enqueue
|
||||
fresh_site = Site.objects.get(pk=site.pk)
|
||||
fresh_site.description = 'updated'
|
||||
fresh_site.save()
|
||||
|
||||
enqueue_event(queue, fresh_site, request, OBJECT_UPDATED)
|
||||
|
||||
# Stale data should have been invalidated; new access should reflect update
|
||||
self.assertEqual(event['data']['description'], 'updated')
|
||||
|
||||
def test_update_then_delete_enqueue_freezes_payload(self):
|
||||
"""
|
||||
When an update event is coalesced with a subsequent delete, the event
|
||||
type should be promoted to OBJECT_DELETED and the payload should be
|
||||
eagerly frozen (since the object will be inaccessible after deletion).
|
||||
"""
|
||||
request = RequestFactory().get(reverse('dcim:site_add'))
|
||||
request.id = uuid.uuid4()
|
||||
request.user = self.user
|
||||
|
||||
site = Site.objects.create(name='Site 1', slug='site-1')
|
||||
|
||||
queue = {}
|
||||
enqueue_event(queue, site, request, OBJECT_UPDATED)
|
||||
|
||||
event = queue[f'dcim.site:{site.pk}']
|
||||
|
||||
enqueue_event(queue, site, request, OBJECT_DELETED)
|
||||
|
||||
# Event type should have been promoted
|
||||
self.assertEqual(event['event_type'], OBJECT_DELETED)
|
||||
|
||||
# Data should already be materialized (frozen), not lazy
|
||||
self.assertIn('data', event.data)
|
||||
self.assertEqual(event['data']['name'], 'Site 1')
|
||||
self.assertIsNone(event['snapshots']['postchange'])
|
||||
|
||||
def test_duplicate_triggers(self):
|
||||
"""
|
||||
Test for erroneous duplicate event triggers resulting from saving an object multiple times
|
||||
|
||||
@@ -159,11 +159,9 @@ class Aggregate(ContactsMixin, GetAvailablePrefixesMixin, PrimaryModel):
|
||||
|
||||
@property
|
||||
def family(self):
|
||||
if not self.prefix:
|
||||
return None
|
||||
if isinstance(self.prefix, str):
|
||||
return netaddr.IPNetwork(self.prefix).version
|
||||
return self.prefix.version
|
||||
if self.prefix:
|
||||
return self.prefix.version
|
||||
return None
|
||||
|
||||
@property
|
||||
def ipv6_full(self):
|
||||
@@ -337,19 +335,11 @@ class Prefix(ContactsMixin, GetAvailablePrefixesMixin, CachedScopeMixin, Primary
|
||||
|
||||
@property
|
||||
def family(self):
|
||||
if not self.prefix:
|
||||
return None
|
||||
if isinstance(self.prefix, str):
|
||||
return netaddr.IPNetwork(self.prefix).version
|
||||
return self.prefix.version
|
||||
return self.prefix.version if self.prefix else None
|
||||
|
||||
@property
|
||||
def mask_length(self):
|
||||
if not self.prefix:
|
||||
return None
|
||||
if isinstance(self.prefix, str):
|
||||
return netaddr.IPNetwork(self.prefix).prefixlen
|
||||
return self.prefix.prefixlen
|
||||
return self.prefix.prefixlen if self.prefix else None
|
||||
|
||||
@property
|
||||
def ipv6_full(self):
|
||||
@@ -652,11 +642,7 @@ class IPRange(ContactsMixin, PrimaryModel):
|
||||
|
||||
@property
|
||||
def family(self):
|
||||
if not self.start_address:
|
||||
return None
|
||||
if isinstance(self.start_address, str):
|
||||
return netaddr.IPAddress(self.start_address.split('/')[0]).version
|
||||
return self.start_address.version
|
||||
return self.start_address.version if self.start_address else None
|
||||
|
||||
@property
|
||||
def range(self):
|
||||
@@ -1004,11 +990,9 @@ class IPAddress(ContactsMixin, PrimaryModel):
|
||||
|
||||
@property
|
||||
def family(self):
|
||||
if not self.address:
|
||||
return None
|
||||
if isinstance(self.address, str):
|
||||
return netaddr.IPNetwork(self.address).version
|
||||
return self.address.version
|
||||
if self.address:
|
||||
return self.address.version
|
||||
return None
|
||||
|
||||
@property
|
||||
def is_oob_ip(self):
|
||||
|
||||
@@ -11,13 +11,6 @@ from utilities.data import string_to_ranges
|
||||
|
||||
class TestAggregate(TestCase):
|
||||
|
||||
def test_family_string(self):
|
||||
# Test property when prefix is a string
|
||||
agg = Aggregate(prefix='10.0.0.0/8')
|
||||
self.assertEqual(agg.family, 4)
|
||||
agg_v6 = Aggregate(prefix='2001:db8::/32')
|
||||
self.assertEqual(agg_v6.family, 6)
|
||||
|
||||
def test_get_utilization(self):
|
||||
rir = RIR.objects.create(name='RIR 1', slug='rir-1')
|
||||
aggregate = Aggregate(prefix=IPNetwork('10.0.0.0/8'), rir=rir)
|
||||
@@ -47,13 +40,6 @@ class TestAggregate(TestCase):
|
||||
|
||||
class TestIPRange(TestCase):
|
||||
|
||||
def test_family_string(self):
|
||||
# Test property when start_address is a string
|
||||
ip_range = IPRange(start_address='10.0.0.1/24', end_address='10.0.0.254/24')
|
||||
self.assertEqual(ip_range.family, 4)
|
||||
ip_range_v6 = IPRange(start_address='2001:db8::1/64', end_address='2001:db8::ffff/64')
|
||||
self.assertEqual(ip_range_v6.family, 6)
|
||||
|
||||
def test_overlapping_range(self):
|
||||
iprange_192_168 = IPRange.objects.create(
|
||||
start_address=IPNetwork('192.168.0.1/22'), end_address=IPNetwork('192.168.0.49/22')
|
||||
@@ -104,20 +90,6 @@ class TestIPRange(TestCase):
|
||||
|
||||
class TestPrefix(TestCase):
|
||||
|
||||
def test_family_string(self):
|
||||
# Test property when prefix is a string
|
||||
prefix = Prefix(prefix='10.0.0.0/8')
|
||||
self.assertEqual(prefix.family, 4)
|
||||
prefix_v6 = Prefix(prefix='2001:db8::/32')
|
||||
self.assertEqual(prefix_v6.family, 6)
|
||||
|
||||
def test_mask_length_string(self):
|
||||
# Test property when prefix is a string
|
||||
prefix = Prefix(prefix='10.0.0.0/8')
|
||||
self.assertEqual(prefix.mask_length, 8)
|
||||
prefix_v6 = Prefix(prefix='2001:db8::/32')
|
||||
self.assertEqual(prefix_v6.mask_length, 32)
|
||||
|
||||
def test_get_duplicates(self):
|
||||
prefixes = Prefix.objects.bulk_create((
|
||||
Prefix(prefix=IPNetwork('192.0.2.0/24')),
|
||||
@@ -561,13 +533,6 @@ class TestPrefixHierarchy(TestCase):
|
||||
|
||||
class TestIPAddress(TestCase):
|
||||
|
||||
def test_family_string(self):
|
||||
# Test property when address is a string
|
||||
ip = IPAddress(address='10.0.0.1/24')
|
||||
self.assertEqual(ip.family, 4)
|
||||
ip_v6 = IPAddress(address='2001:db8::1/64')
|
||||
self.assertEqual(ip_v6.family, 6)
|
||||
|
||||
def test_get_duplicates(self):
|
||||
ips = IPAddress.objects.bulk_create((
|
||||
IPAddress(address=IPNetwork('192.0.2.1/24')),
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
from rest_framework import serializers
|
||||
from rest_framework.fields import CreateOnlyDefault
|
||||
|
||||
from extras.api.customfields import CustomFieldDefaultValues, CustomFieldListSerializer, CustomFieldsDataField
|
||||
from extras.api.customfields import CustomFieldDefaultValues, CustomFieldsDataField
|
||||
|
||||
from .base import ValidatedModelSerializer
|
||||
from .nested import NestedTagSerializer
|
||||
@@ -23,29 +23,6 @@ class CustomFieldModelSerializer(serializers.Serializer):
|
||||
default=CreateOnlyDefault(CustomFieldDefaultValues())
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def many_init(cls, *args, **kwargs):
|
||||
"""
|
||||
We can't call super().many_init() and change the outcome because by the time it returns,
|
||||
the plain ListSerializer is already instantiated.
|
||||
Because every NetBox serializer defines its own Meta which doesn't inherit from a parent Meta,
|
||||
this would silently not apply to any real serializer.
|
||||
Thats why this method replicates many_init from parent and changed the default value for list_serializer_class.
|
||||
"""
|
||||
list_kwargs = {}
|
||||
for key in serializers.LIST_SERIALIZER_KWARGS_REMOVE:
|
||||
value = kwargs.pop(key, None)
|
||||
if value is not None:
|
||||
list_kwargs[key] = value
|
||||
list_kwargs['child'] = cls(*args, **kwargs)
|
||||
list_kwargs.update({
|
||||
key: value for key, value in kwargs.items()
|
||||
if key in serializers.LIST_SERIALIZER_KWARGS
|
||||
})
|
||||
meta = getattr(cls, 'Meta', None)
|
||||
list_serializer_class = getattr(meta, 'list_serializer_class', CustomFieldListSerializer)
|
||||
return list_serializer_class(*args, **list_kwargs)
|
||||
|
||||
|
||||
class TaggableModelSerializer(serializers.Serializer):
|
||||
"""
|
||||
|
||||
@@ -159,7 +159,7 @@ class BaseTable(tables.Table):
|
||||
columns = None
|
||||
ordering = None
|
||||
|
||||
if request.user.is_authenticated and self.prefixed_order_by_field in request.GET:
|
||||
if self.prefixed_order_by_field in request.GET:
|
||||
if request.GET[self.prefixed_order_by_field]:
|
||||
# If an ordering has been specified as a query parameter, save it as the
|
||||
# user's preferred ordering for this table.
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
from django.contrib.auth.models import AnonymousUser
|
||||
from django.template import Context, Template
|
||||
from django.test import RequestFactory, TestCase
|
||||
|
||||
@@ -47,16 +46,6 @@ class BaseTableTest(TestCase):
|
||||
prefetch_lookups = table.data.data._prefetch_related_lookups
|
||||
self.assertEqual(prefetch_lookups, tuple())
|
||||
|
||||
def test_configure_anonymous_user_with_ordering(self):
|
||||
"""
|
||||
Verify that table.configure() does not raise an error when an anonymous
|
||||
user sorts a table column.
|
||||
"""
|
||||
request = RequestFactory().get('/?sort=name')
|
||||
request.user = AnonymousUser()
|
||||
table = DeviceTable(Device.objects.all())
|
||||
table.configure(request)
|
||||
|
||||
|
||||
class TagColumnTable(NetBoxTable):
|
||||
tags = columns.TagColumn(url_name='dcim:site_list')
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user