mirror of
https://github.com/netbox-community/netbox.git
synced 2026-04-01 23:23:24 +02:00
Compare commits
1 Commits
20924-plug
...
21498-even
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c720b07538 |
@@ -38,7 +38,19 @@ class ConnectedEndpointsSerializer(serializers.ModelSerializer):
|
||||
|
||||
@extend_schema_field(serializers.BooleanField)
|
||||
def get_connected_endpoints_reachable(self, obj):
|
||||
return obj._path and obj._path.is_complete and obj._path.is_active
|
||||
"""
|
||||
Determines if the connected endpoints are reachable through a cable path.
|
||||
|
||||
This method checks whether there is a valid and active cable path that
|
||||
connects the endpoints. It evaluates both the completeness and active
|
||||
status of the path to determine reachability.
|
||||
"""
|
||||
# Use the public `path` accessor rather than dereferencing `_path`
|
||||
# directly. `path` already handles the stale in-memory relation case
|
||||
# that can occur while CablePath rows are rebuilt during cable edits.
|
||||
if path := obj.path:
|
||||
return path.is_complete and path.is_active
|
||||
return False
|
||||
|
||||
|
||||
class PortSerializer(serializers.ModelSerializer):
|
||||
|
||||
@@ -2,7 +2,7 @@ from functools import cached_property
|
||||
|
||||
from django.contrib.contenttypes.fields import GenericForeignKey, GenericRelation
|
||||
from django.contrib.postgres.fields import ArrayField
|
||||
from django.core.exceptions import ValidationError
|
||||
from django.core.exceptions import ObjectDoesNotExist, ValidationError
|
||||
from django.core.validators import MaxValueValidator, MinValueValidator
|
||||
from django.db import models
|
||||
from django.db.models import Sum
|
||||
@@ -298,20 +298,24 @@ class CabledObjectModel(models.Model):
|
||||
|
||||
class PathEndpoint(models.Model):
|
||||
"""
|
||||
An abstract model inherited by any CabledObjectModel subclass which represents the end of a CablePath; specifically,
|
||||
these include ConsolePort, ConsoleServerPort, PowerPort, PowerOutlet, Interface, and PowerFeed.
|
||||
An abstract model inherited by any CabledObjectModel subclass which
|
||||
represents the end of a CablePath; specifically, these include
|
||||
ConsolePort, ConsoleServerPort, PowerPort, PowerOutlet, Interface, and
|
||||
PowerFeed.
|
||||
|
||||
`_path` references the CablePath originating from this instance, if any. It is set or cleared by the receivers in
|
||||
dcim.signals in response to changes in the cable path, and complements the `origin` GenericForeignKey field on the
|
||||
CablePath model. `_path` should not be accessed directly; rather, use the `path` property.
|
||||
|
||||
`connected_endpoints()` is a convenience method for returning the destination of the associated CablePath, if any.
|
||||
`_path` references the CablePath originating from this instance, if any.
|
||||
It is set or cleared by the receivers in dcim.signals in response to
|
||||
changes in the cable path, and complements the `origin` GenericForeignKey
|
||||
field on the CablePath model. `_path` should not be accessed directly;
|
||||
rather, use the `path` property. `connected_endpoints()` is a convenience
|
||||
method for returning the destination of the associated CablePath, if any.
|
||||
"""
|
||||
|
||||
_path = models.ForeignKey(
|
||||
to='dcim.CablePath',
|
||||
on_delete=models.SET_NULL,
|
||||
null=True,
|
||||
blank=True
|
||||
blank=True,
|
||||
)
|
||||
|
||||
class Meta:
|
||||
@@ -323,39 +327,74 @@ class PathEndpoint(models.Model):
|
||||
|
||||
# Construct the complete path (including e.g. bridged interfaces)
|
||||
while origin is not None:
|
||||
|
||||
if origin._path is None:
|
||||
# Go through the public accessor rather than dereferencing `_path`
|
||||
# directly. During cable edits, CablePath rows can be deleted and
|
||||
# recreated while this endpoint instance is still in memory.
|
||||
cable_path = origin.path
|
||||
if cable_path is None:
|
||||
break
|
||||
|
||||
path.extend(origin._path.path_objects)
|
||||
path.extend(cable_path.path_objects)
|
||||
|
||||
# If the path ends at a non-connected pass-through port, pad out the link and far-end terminations
|
||||
# If the path ends at a non-connected pass-through port, pad out
|
||||
# the link and far-end terminations.
|
||||
if len(path) % 3 == 1:
|
||||
path.extend(([], []))
|
||||
# If the path ends at a site or provider network, inject a null "link" to render an attachment
|
||||
|
||||
# If the path ends at a site or provider network, inject a null
|
||||
# "link" to render an attachment.
|
||||
elif len(path) % 3 == 2:
|
||||
path.insert(-1, [])
|
||||
|
||||
# Check for a bridged relationship to continue the trace
|
||||
destinations = origin._path.destinations
|
||||
# Check for a bridged relationship to continue the trace.
|
||||
destinations = cable_path.destinations
|
||||
if len(destinations) == 1:
|
||||
origin = getattr(destinations[0], 'bridge', None)
|
||||
else:
|
||||
origin = None
|
||||
|
||||
# Return the path as a list of three-tuples (A termination(s), cable(s), B termination(s))
|
||||
# Return the path as a list of three-tuples
|
||||
# (A termination(s), cable(s), B termination(s))
|
||||
return list(zip(*[iter(path)] * 3))
|
||||
|
||||
@property
|
||||
def path(self):
|
||||
return self._path
|
||||
"""
|
||||
Return this endpoint's current CablePath, if any.
|
||||
|
||||
`_path` is a denormalized reference that is updated from CablePath
|
||||
save/delete handlers, including queryset.update() calls on origin
|
||||
endpoints. That means an already-instantiated endpoint can briefly hold
|
||||
a stale in-memory `_path` relation while the database already points to
|
||||
a different CablePath (or to no path at all).
|
||||
|
||||
If the cached relation points to a CablePath that has just been
|
||||
deleted, refresh only the `_path` field from the database and retry.
|
||||
This keeps the fix cheap and narrowly scoped to the denormalized FK.
|
||||
"""
|
||||
if self._path_id is None:
|
||||
return None
|
||||
|
||||
try:
|
||||
return self._path
|
||||
except ObjectDoesNotExist:
|
||||
# Refresh only the denormalized FK instead of the whole model.
|
||||
# The expected problem here is in-memory staleness during path
|
||||
# rebuilds, not persistent database corruption.
|
||||
self.refresh_from_db(fields=['_path'])
|
||||
return self._path if self._path_id else None
|
||||
|
||||
@cached_property
|
||||
def connected_endpoints(self):
|
||||
"""
|
||||
Caching accessor for the attached CablePath's destination (if any)
|
||||
Caching accessor for the attached CablePath's destinations (if any).
|
||||
|
||||
Always route through `path` so stale in-memory `_path` references are
|
||||
repaired before we cache the result for the lifetime of this instance.
|
||||
"""
|
||||
return self._path.destinations if self._path else []
|
||||
if cable_path := self.path:
|
||||
return cable_path.destinations
|
||||
return []
|
||||
|
||||
|
||||
#
|
||||
|
||||
@@ -5,6 +5,7 @@ from circuits.models import *
|
||||
from core.models import ObjectType
|
||||
from dcim.choices import *
|
||||
from dcim.models import *
|
||||
from extras.events import serialize_for_event
|
||||
from extras.models import CustomField
|
||||
from ipam.models import Prefix
|
||||
from netbox.choices import WeightUnitChoices
|
||||
@@ -1274,6 +1275,65 @@ class CableTestCase(TestCase):
|
||||
self.assertEqual(a_terms, [interface1])
|
||||
self.assertEqual(b_terms, [interface2])
|
||||
|
||||
@tag('regression') # #21498
|
||||
def test_path_refreshes_replaced_cablepath_reference(self):
|
||||
"""
|
||||
An already-instantiated interface should refresh its denormalized
|
||||
`_path` foreign key when the referenced CablePath row has been
|
||||
replaced in the database.
|
||||
"""
|
||||
stale_interface = Interface.objects.get(device__name='TestDevice1', name='eth0')
|
||||
old_path = CablePath.objects.get(pk=stale_interface._path_id)
|
||||
|
||||
new_path = CablePath(
|
||||
path=old_path.path,
|
||||
is_active=old_path.is_active,
|
||||
is_complete=old_path.is_complete,
|
||||
is_split=old_path.is_split,
|
||||
)
|
||||
old_path_id = old_path.pk
|
||||
old_path.delete()
|
||||
new_path.save()
|
||||
|
||||
# The old CablePath no longer exists
|
||||
self.assertFalse(CablePath.objects.filter(pk=old_path_id).exists())
|
||||
|
||||
# The already-instantiated interface still points to the deleted path
|
||||
# until the accessor refreshes `_path` from the database.
|
||||
self.assertEqual(stale_interface._path_id, old_path_id)
|
||||
self.assertEqual(stale_interface.path.pk, new_path.pk)
|
||||
|
||||
@tag('regression') # #21498
|
||||
def test_serialize_for_event_handles_stale_cablepath_reference_after_retermination(self):
|
||||
"""
|
||||
Serializing an interface whose previously cached `_path` row has been
|
||||
deleted during cable retermination must not raise.
|
||||
"""
|
||||
stale_interface = Interface.objects.get(device__name='TestDevice2', name='eth0')
|
||||
old_path_id = stale_interface._path_id
|
||||
new_peer = Interface.objects.get(device__name='TestDevice2', name='eth1')
|
||||
cable = stale_interface.cable
|
||||
|
||||
self.assertIsNotNone(cable)
|
||||
self.assertIsNotNone(old_path_id)
|
||||
self.assertEqual(stale_interface.cable_end, 'B')
|
||||
|
||||
cable.b_terminations = [new_peer]
|
||||
cable.save()
|
||||
|
||||
# The old CablePath was deleted during retrace.
|
||||
self.assertFalse(CablePath.objects.filter(pk=old_path_id).exists())
|
||||
|
||||
# The stale in-memory instance still holds the deleted FK value.
|
||||
self.assertEqual(stale_interface._path_id, old_path_id)
|
||||
|
||||
# Serialization must not raise ObjectDoesNotExist. Because this interface
|
||||
# was the former B-side termination, it is now disconnected.
|
||||
data = serialize_for_event(stale_interface)
|
||||
self.assertIsNone(data['connected_endpoints'])
|
||||
self.assertIsNone(data['connected_endpoints_type'])
|
||||
self.assertFalse(data['connected_endpoints_reachable'])
|
||||
|
||||
|
||||
class VirtualDeviceContextTestCase(TestCase):
|
||||
|
||||
|
||||
@@ -25,22 +25,59 @@ logger = logging.getLogger('netbox.events_processor')
|
||||
|
||||
class EventContext(UserDict):
|
||||
"""
|
||||
A custom dictionary that automatically serializes its associated object on demand.
|
||||
Dictionary-compatible wrapper for queued events that lazily serializes
|
||||
``event['data']`` on first access.
|
||||
|
||||
Backward-compatible with the plain-dict interface expected by existing
|
||||
EVENTS_PIPELINE consumers. When the same object is enqueued more than once
|
||||
in a single request, the serialization source is updated so consumers see
|
||||
the latest state.
|
||||
"""
|
||||
|
||||
# We're emulating a dictionary here (rather than using a custom class) because prior to NetBox v4.5.2, events were
|
||||
# queued as dictionaries for processing by handles in EVENTS_PIPELINE. We need to avoid introducing any breaking
|
||||
# changes until a suitable minor release.
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
# Track which model instance should be serialized if/when `data` is
|
||||
# requested. This may be refreshed on duplicate enqueue, while leaving
|
||||
# the public `object` entry untouched for compatibility.
|
||||
self._serialization_source = self.get('object')
|
||||
|
||||
def refresh_serialization_source(self, instance):
|
||||
"""
|
||||
Point lazy serialization at a fresher instance, invalidating any
|
||||
already-materialized ``data``.
|
||||
"""
|
||||
self._serialization_source = instance
|
||||
# UserDict.__contains__ checks the backing dict directly, so `in`
|
||||
# does not trigger __getitem__'s lazy serialization.
|
||||
if 'data' in self:
|
||||
del self['data']
|
||||
|
||||
def freeze_data(self, instance):
|
||||
"""
|
||||
Eagerly serialize and cache the payload for delete events, where the
|
||||
object may become inaccessible after deletion.
|
||||
"""
|
||||
super().__setitem__('data', serialize_for_event(instance))
|
||||
self._serialization_source = None
|
||||
|
||||
def __getitem__(self, item):
|
||||
if item == 'data' and 'data' not in self:
|
||||
data = serialize_for_event(self['object'])
|
||||
self.__setitem__('data', data)
|
||||
# Materialize the payload only when an event consumer asks for it.
|
||||
#
|
||||
# On coalesced events, use the latest explicitly queued instance so
|
||||
# webhooks/scripts/notifications observe the final queued state for
|
||||
# that object within the request.
|
||||
source = self._serialization_source or super().__getitem__('object')
|
||||
super().__setitem__('data', serialize_for_event(source))
|
||||
|
||||
return super().__getitem__(item)
|
||||
|
||||
|
||||
def serialize_for_event(instance):
|
||||
"""
|
||||
Return a serialized representation of the given instance suitable for use in a queued event.
|
||||
Return a serialized representation of the given instance suitable for use
|
||||
in a queued event.
|
||||
"""
|
||||
serializer_class = get_serializer_for_model(instance.__class__)
|
||||
serializer_context = {
|
||||
@@ -53,7 +90,8 @@ def serialize_for_event(instance):
|
||||
|
||||
def get_snapshots(instance, event_type):
|
||||
"""
|
||||
Return a dictionary of pre- and post-change snapshots for the given instance.
|
||||
Return a dictionary of pre- and post-change snapshots for the given
|
||||
instance.
|
||||
"""
|
||||
if event_type == OBJECT_DELETED:
|
||||
# Post-change snapshot must be empty for deleted objects
|
||||
@@ -76,8 +114,9 @@ def get_snapshots(instance, event_type):
|
||||
|
||||
def enqueue_event(queue, instance, request, event_type):
|
||||
"""
|
||||
Enqueue a serialized representation of a created/updated/deleted object for the processing of
|
||||
events once the request has completed.
|
||||
Enqueue (or coalesce) an event for a created/updated/deleted object.
|
||||
|
||||
Events are processed after the request completes.
|
||||
"""
|
||||
# Bail if this type of object does not support event rules
|
||||
if not has_feature(instance, 'event_rules'):
|
||||
@@ -88,11 +127,18 @@ def enqueue_event(queue, instance, request, event_type):
|
||||
|
||||
assert instance.pk is not None
|
||||
key = f'{app_label}.{model_name}:{instance.pk}'
|
||||
|
||||
if key in queue:
|
||||
queue[key]['snapshots']['postchange'] = get_snapshots(instance, event_type)['postchange']
|
||||
# If the object is being deleted, update any prior "update" event to "delete"
|
||||
|
||||
# If the object is being deleted, convert any prior update event into a
|
||||
# delete event and freeze the payload before the object (or related
|
||||
# rows) become inaccessible.
|
||||
if event_type == OBJECT_DELETED:
|
||||
queue[key]['event_type'] = event_type
|
||||
else:
|
||||
# Keep the public `object` entry stable for compatibility.
|
||||
queue[key].refresh_serialization_source(instance)
|
||||
else:
|
||||
queue[key] = EventContext(
|
||||
object_type=ObjectType.objects.get_for_model(instance),
|
||||
@@ -106,14 +152,16 @@ def enqueue_event(queue, instance, request, event_type):
|
||||
username=request.user.username, # DEPRECATED, will be removed in NetBox v4.7.0
|
||||
request_id=request.id, # DEPRECATED, will be removed in NetBox v4.7.0
|
||||
)
|
||||
# Force serialization of objects prior to them actually being deleted
|
||||
|
||||
# For delete events, eagerly serialize the payload before the row is gone.
|
||||
# This covers both first-time enqueues and coalesced update→delete promotions.
|
||||
if event_type == OBJECT_DELETED:
|
||||
queue[key]['data'] = serialize_for_event(instance)
|
||||
queue[key].freeze_data(instance)
|
||||
|
||||
|
||||
def process_event_rules(event_rules, object_type, event):
|
||||
"""
|
||||
Process a list of EventRules against an event.
|
||||
Evaluate and dispatch a list of EventRules against an event.
|
||||
|
||||
Notes on event sources:
|
||||
- Object change events (created/updated/deleted) are enqueued via
|
||||
@@ -133,9 +181,9 @@ def process_event_rules(event_rules, object_type, event):
|
||||
if not event_rule.eval_conditions(event['data']):
|
||||
continue
|
||||
|
||||
# Compile event data
|
||||
event_data = event_rule.action_data or {}
|
||||
event_data.update(event['data'])
|
||||
# Merge rule-specific action_data with the event payload.
|
||||
# Copy to avoid mutating the rule's stored action_data dict.
|
||||
event_data = {**(event_rule.action_data or {}), **event['data']}
|
||||
|
||||
# Webhooks
|
||||
if event_rule.action_type == EventRuleActionChoices.WEBHOOK:
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
import json
|
||||
import uuid
|
||||
from unittest import skipIf
|
||||
from unittest.mock import Mock, patch
|
||||
|
||||
import django_rq
|
||||
from django.conf import settings
|
||||
from django.http import HttpResponse
|
||||
from django.test import RequestFactory
|
||||
from django.urls import reverse
|
||||
@@ -343,6 +345,7 @@ class EventRuleTest(APITestCase):
|
||||
self.assertEqual(job.kwargs['snapshots']['prechange']['name'], sites[i].name)
|
||||
self.assertEqual(job.kwargs['snapshots']['prechange']['tags'], ['Bar', 'Foo'])
|
||||
|
||||
@skipIf('netbox.tests.dummy_plugin' not in settings.PLUGINS, 'dummy_plugin not in settings.PLUGINS')
|
||||
def test_send_webhook(self):
|
||||
request_id = uuid.uuid4()
|
||||
|
||||
@@ -426,6 +429,97 @@ class EventRuleTest(APITestCase):
|
||||
self.assertEqual(job.kwargs['object_type'], script_type)
|
||||
self.assertEqual(job.kwargs['username'], self.user.username)
|
||||
|
||||
def test_duplicate_enqueue_refreshes_lazy_payload(self):
|
||||
"""
|
||||
When the same object is enqueued more than once in a single request,
|
||||
lazy serialization should use the most recently enqueued instance while
|
||||
preserving the original event['object'] reference.
|
||||
"""
|
||||
request = RequestFactory().get(reverse('dcim:site_add'))
|
||||
request.id = uuid.uuid4()
|
||||
request.user = self.user
|
||||
|
||||
site = Site.objects.create(name='Site 1', slug='site-1')
|
||||
stale_site = Site.objects.get(pk=site.pk)
|
||||
|
||||
queue = {}
|
||||
enqueue_event(queue, stale_site, request, OBJECT_UPDATED)
|
||||
|
||||
event = queue[f'dcim.site:{site.pk}']
|
||||
|
||||
# Data should not be materialized yet (lazy serialization)
|
||||
self.assertNotIn('data', event.data)
|
||||
|
||||
fresh_site = Site.objects.get(pk=site.pk)
|
||||
fresh_site.description = 'foo'
|
||||
fresh_site.save()
|
||||
|
||||
enqueue_event(queue, fresh_site, request, OBJECT_UPDATED)
|
||||
|
||||
# The original object reference should be preserved
|
||||
self.assertIs(event['object'], stale_site)
|
||||
|
||||
# But serialized data should reflect the fresher instance
|
||||
self.assertEqual(event['data']['description'], 'foo')
|
||||
self.assertEqual(event['snapshots']['postchange']['description'], 'foo')
|
||||
|
||||
def test_duplicate_enqueue_invalidates_materialized_data(self):
|
||||
"""
|
||||
If event['data'] has already been materialized before a second enqueue
|
||||
for the same object, the stale payload should be discarded and rebuilt
|
||||
from the fresher instance on next access.
|
||||
"""
|
||||
request = RequestFactory().get(reverse('dcim:site_add'))
|
||||
request.id = uuid.uuid4()
|
||||
request.user = self.user
|
||||
|
||||
site = Site.objects.create(name='Site 1', slug='site-1')
|
||||
|
||||
queue = {}
|
||||
enqueue_event(queue, site, request, OBJECT_UPDATED)
|
||||
|
||||
event = queue[f'dcim.site:{site.pk}']
|
||||
|
||||
# Force early materialization
|
||||
self.assertEqual(event['data']['description'], '')
|
||||
|
||||
# Now update and re-enqueue
|
||||
fresh_site = Site.objects.get(pk=site.pk)
|
||||
fresh_site.description = 'updated'
|
||||
fresh_site.save()
|
||||
|
||||
enqueue_event(queue, fresh_site, request, OBJECT_UPDATED)
|
||||
|
||||
# Stale data should have been invalidated; new access should reflect update
|
||||
self.assertEqual(event['data']['description'], 'updated')
|
||||
|
||||
def test_update_then_delete_enqueue_freezes_payload(self):
|
||||
"""
|
||||
When an update event is coalesced with a subsequent delete, the event
|
||||
type should be promoted to OBJECT_DELETED and the payload should be
|
||||
eagerly frozen (since the object will be inaccessible after deletion).
|
||||
"""
|
||||
request = RequestFactory().get(reverse('dcim:site_add'))
|
||||
request.id = uuid.uuid4()
|
||||
request.user = self.user
|
||||
|
||||
site = Site.objects.create(name='Site 1', slug='site-1')
|
||||
|
||||
queue = {}
|
||||
enqueue_event(queue, site, request, OBJECT_UPDATED)
|
||||
|
||||
event = queue[f'dcim.site:{site.pk}']
|
||||
|
||||
enqueue_event(queue, site, request, OBJECT_DELETED)
|
||||
|
||||
# Event type should have been promoted
|
||||
self.assertEqual(event['event_type'], OBJECT_DELETED)
|
||||
|
||||
# Data should already be materialized (frozen), not lazy
|
||||
self.assertIn('data', event.data)
|
||||
self.assertEqual(event['data']['name'], 'Site 1')
|
||||
self.assertIsNone(event['snapshots']['postchange'])
|
||||
|
||||
def test_duplicate_triggers(self):
|
||||
"""
|
||||
Test for erroneous duplicate event triggers resulting from saving an object multiple times
|
||||
|
||||
Reference in New Issue
Block a user