mirror of
https://github.com/netbox-community/netbox.git
synced 2026-04-16 22:19:53 +02:00
Compare commits
1 Commits
v4.6.0-bet
...
21688-redu
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f9f64a5f29 |
@@ -1,9 +1,14 @@
|
||||
from collections import defaultdict, namedtuple
|
||||
|
||||
from django.core.exceptions import ValidationError
|
||||
from django.db.models import Q
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
|
||||
from dcim.choices import CableEndChoices
|
||||
from dcim.models import CableTermination
|
||||
|
||||
PeerLookupKey = namedtuple('PeerLookupKey', ['cable_id', 'cable_end', 'connector', 'position'])
|
||||
|
||||
|
||||
class BaseCableProfile:
|
||||
"""Base class for representing a cable profile."""
|
||||
@@ -55,9 +60,14 @@ class BaseCableProfile:
|
||||
return self._mapping.get((connector, position))
|
||||
return connector, position
|
||||
|
||||
def get_peer_termination(self, termination, position):
|
||||
#
|
||||
# Peer termination resolution
|
||||
#
|
||||
|
||||
def _get_peer_lookup_key(self, termination, position):
|
||||
"""
|
||||
Given a terminating object, return the peer terminating object (if any) on the opposite end of the cable.
|
||||
Given a cabled object and a local position, return the (cable_id, cable_end, connector, position)
|
||||
tuple identifying the corresponding peer CableTermination on the opposite end of the cable.
|
||||
"""
|
||||
try:
|
||||
connector, position = self.get_mapped_position(
|
||||
@@ -70,10 +80,17 @@ class BaseCableProfile:
|
||||
f"Could not map connector {termination.cable_connector} position {position} on side "
|
||||
f"{termination.cable_end}"
|
||||
)
|
||||
return PeerLookupKey(termination.cable_id, termination.opposite_cable_end, connector, position)
|
||||
|
||||
def get_peer_termination(self, termination, position):
|
||||
"""
|
||||
Given a terminating object, return the peer terminating object (if any) on the opposite end of the cable.
|
||||
"""
|
||||
cable_id, cable_end, connector, position = self._get_peer_lookup_key(termination, position)
|
||||
try:
|
||||
ct = CableTermination.objects.get(
|
||||
cable=termination.cable,
|
||||
cable_end=termination.opposite_cable_end,
|
||||
cable_id=cable_id,
|
||||
cable_end=cable_end,
|
||||
connector=connector,
|
||||
positions__contains=[position],
|
||||
)
|
||||
@@ -81,6 +98,81 @@ class BaseCableProfile:
|
||||
except CableTermination.DoesNotExist:
|
||||
return None, None
|
||||
|
||||
def get_peer_terminations(self, term_position_pairs):
|
||||
"""
|
||||
Resolve a batch of (termination, position) pairs to peer terminations.
|
||||
"""
|
||||
if not term_position_pairs:
|
||||
return []
|
||||
|
||||
# Fast path: a single pair doesn't benefit from batching
|
||||
if len(term_position_pairs) == 1:
|
||||
return [self.get_peer_termination(*term_position_pairs[0])]
|
||||
|
||||
lookup_keys = [
|
||||
self._get_peer_lookup_key(termination, position) for termination, position in term_position_pairs
|
||||
]
|
||||
|
||||
# Group requested positions by (cable_id, cable_end, connector) so we can
|
||||
# build one overlap clause per group instead of one clause per position.
|
||||
positions_by_group = defaultdict(set)
|
||||
for key in lookup_keys:
|
||||
positions_by_group[(key.cable_id, key.cable_end, key.connector)].add(key.position)
|
||||
|
||||
q_filter = Q()
|
||||
for (cable_id, cable_end, connector), positions in positions_by_group.items():
|
||||
q_filter |= Q(
|
||||
cable_id=cable_id,
|
||||
cable_end=cable_end,
|
||||
connector=connector,
|
||||
positions__overlap=list(positions),
|
||||
)
|
||||
|
||||
peer_ct_by_key = {}
|
||||
term_ids_by_type = defaultdict(set)
|
||||
model_by_type = {}
|
||||
|
||||
for ct in CableTermination.objects.filter(q_filter).select_related('termination_type'):
|
||||
model_by_type[ct.termination_type_id] = ct.termination_type.model_class()
|
||||
term_ids_by_type[ct.termination_type_id].add(ct.termination_id)
|
||||
|
||||
group_key = (ct.cable_id, ct.cable_end, ct.connector)
|
||||
requested_positions = positions_by_group.get(group_key, ())
|
||||
|
||||
# The overlap query may return a termination carrying additional
|
||||
# positions, so only index the positions we actually requested.
|
||||
for pos in ct.positions or []:
|
||||
if pos not in requested_positions:
|
||||
continue
|
||||
|
||||
key = PeerLookupKey(ct.cable_id, ct.cable_end, ct.connector, pos)
|
||||
if key in peer_ct_by_key and peer_ct_by_key[key].pk != ct.pk:
|
||||
raise CableTermination.MultipleObjectsReturned(
|
||||
f"Multiple peer terminations found for cable {ct.cable_id} end {ct.cable_end} "
|
||||
f"connector {ct.connector} position {pos}"
|
||||
)
|
||||
peer_ct_by_key[key] = ct
|
||||
|
||||
# Use _base_manager to ensure all termination objects are loaded
|
||||
# regardless of any custom default manager filters (e.g. soft-delete).
|
||||
# Note: Django's GenericForeignKey / ContentType.get_object_for_this_type()
|
||||
# uses _default_manager, but _base_manager is safer for bulk resolution
|
||||
# during path tracing.
|
||||
term_by_type = {
|
||||
type_id: model_by_type[type_id]._base_manager.in_bulk(ids) for type_id, ids in term_ids_by_type.items()
|
||||
}
|
||||
|
||||
results = []
|
||||
for key in lookup_keys:
|
||||
ct = peer_ct_by_key.get(key)
|
||||
if ct is None:
|
||||
results.append((None, None))
|
||||
else:
|
||||
termination = term_by_type[ct.termination_type_id].get(ct.termination_id)
|
||||
results.append((termination, key.position if termination is not None else None))
|
||||
|
||||
return results
|
||||
|
||||
@staticmethod
|
||||
def get_position_list(n):
|
||||
"""Return a list of integers from 1 to n, inclusive."""
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import itertools
|
||||
import logging
|
||||
from collections import Counter
|
||||
|
||||
from django.contrib.contenttypes.fields import GenericForeignKey
|
||||
from django.contrib.contenttypes.models import ContentType
|
||||
@@ -907,25 +908,37 @@ class CablePath(models.Model):
|
||||
# Build (termination, position) pairs by matching stacked positions
|
||||
# to each termination's cable_positions. This correctly handles
|
||||
# multiple terminations on different connectors of the same cable.
|
||||
remaining = list(positions)
|
||||
remaining = Counter(positions)
|
||||
term_position_pairs = []
|
||||
for term in terminations:
|
||||
if term.cable_positions:
|
||||
for cp in term.cable_positions:
|
||||
if cp in remaining:
|
||||
if remaining[cp]:
|
||||
term_position_pairs.append((term, cp))
|
||||
remaining.remove(cp)
|
||||
remaining[cp] -= 1
|
||||
|
||||
# Fallback for when positions don't match cable_positions
|
||||
# (e.g., empty position stack yielding [None])
|
||||
if not term_position_pairs:
|
||||
term_position_pairs = [(terminations[0], pos) for pos in positions]
|
||||
|
||||
for term, pos in term_position_pairs:
|
||||
peer, new_pos = cable_profile.get_peer_termination(term, pos)
|
||||
if peer not in remote_terminations:
|
||||
peer_results = cable_profile.get_peer_terminations(term_position_pairs)
|
||||
seen = set()
|
||||
for peer, new_pos in peer_results:
|
||||
# Deduplicate peer terminations by model type & PK.
|
||||
key = None if peer is None else (peer._meta.concrete_model, peer.pk)
|
||||
if key not in seen:
|
||||
seen.add(key)
|
||||
remote_terminations.append(peer)
|
||||
new_positions.append(new_pos)
|
||||
|
||||
# If all peers resolved to None (no far-end terminations exist),
|
||||
# treat as an empty result so the path is recorded as incomplete
|
||||
# rather than falling through to the endpoint check with a stale
|
||||
# None entry.
|
||||
if remote_terminations and all(peer is None for peer in remote_terminations):
|
||||
remote_terminations = []
|
||||
|
||||
position_stack.append(new_positions)
|
||||
|
||||
# Legacy (positionless) behavior
|
||||
@@ -945,7 +958,9 @@ class CablePath(models.Model):
|
||||
if not q_filter:
|
||||
break
|
||||
|
||||
remote_cable_terminations = CableTermination.objects.filter(q_filter)
|
||||
remote_cable_terminations = CableTermination.objects.filter(q_filter).prefetch_related(
|
||||
'termination'
|
||||
)
|
||||
remote_terminations = [ct.termination for ct in remote_cable_terminations]
|
||||
else:
|
||||
# WirelessLink
|
||||
|
||||
208
netbox/dcim/tests/test_cable_profiles.py
Normal file
208
netbox/dcim/tests/test_cable_profiles.py
Normal file
@@ -0,0 +1,208 @@
|
||||
from dcim.cable_profiles import (
|
||||
Breakout1C4Px4C1PCableProfile,
|
||||
Single1C1PCableProfile,
|
||||
Single1C4PCableProfile,
|
||||
Trunk2C2PCableProfile,
|
||||
Trunk2C4PShuffleCableProfile,
|
||||
)
|
||||
from dcim.choices import CableProfileChoices
|
||||
from dcim.models import Cable, Interface
|
||||
from dcim.tests.utils import CablePathTestCase
|
||||
|
||||
|
||||
class CableProfilePeerTerminationTests(CablePathTestCase):
|
||||
"""
|
||||
Tests for BaseCableProfile.get_peer_termination() and get_peer_terminations().
|
||||
Verifies that the batch method produces identical results to calling
|
||||
the singular method in a loop.
|
||||
"""
|
||||
|
||||
@classmethod
|
||||
def setUpTestData(cls):
|
||||
super().setUpTestData()
|
||||
|
||||
# Shared pool of interfaces — cables are created per-test since they
|
||||
# mutate CableTerminations and cached fields on save.
|
||||
cls.interfaces = [Interface(device=cls.device, name=f'Interface {i}') for i in range(1, 17)]
|
||||
Interface.objects.bulk_create(cls.interfaces)
|
||||
|
||||
def _assert_batch_matches_singular(self, cable_profile, term_position_pairs):
|
||||
"""
|
||||
Helper: assert get_peer_terminations() returns the same results as
|
||||
calling get_peer_termination() individually for each pair.
|
||||
"""
|
||||
expected = [cable_profile.get_peer_termination(term, pos) for term, pos in term_position_pairs]
|
||||
actual = cable_profile.get_peer_terminations(term_position_pairs)
|
||||
self.assertEqual(len(actual), len(expected))
|
||||
for i, (exp, act) in enumerate(zip(expected, actual)):
|
||||
exp_peer, exp_pos = exp
|
||||
act_peer, act_pos = act
|
||||
self.assertEqual(
|
||||
(type(act_peer), getattr(act_peer, 'pk', None), act_pos),
|
||||
(type(exp_peer), getattr(exp_peer, 'pk', None), exp_pos),
|
||||
msg=f'Mismatch at index {i}: expected {exp}, got {act}',
|
||||
)
|
||||
|
||||
def test_empty_pairs(self):
|
||||
"""
|
||||
get_peer_terminations() with an empty list returns an empty list.
|
||||
"""
|
||||
profile = Single1C1PCableProfile()
|
||||
self.assertEqual(profile.get_peer_terminations([]), [])
|
||||
|
||||
def test_single_pair_fast_path(self):
|
||||
"""
|
||||
A single-pair call should use the fast path and produce the same
|
||||
result as get_peer_termination().
|
||||
"""
|
||||
cable = Cable(
|
||||
profile=CableProfileChoices.SINGLE_1C1P,
|
||||
a_terminations=[self.interfaces[0]],
|
||||
b_terminations=[self.interfaces[1]],
|
||||
)
|
||||
cable.clean()
|
||||
cable.save()
|
||||
|
||||
self.interfaces[0].refresh_from_db()
|
||||
profile = Single1C1PCableProfile()
|
||||
|
||||
self._assert_batch_matches_singular(profile, [(self.interfaces[0], 1)])
|
||||
|
||||
def test_single_connector_multi_position(self):
|
||||
"""
|
||||
Batch resolution on a Single 1C4P profile should return the same
|
||||
peers as individual lookups for each position.
|
||||
"""
|
||||
cable = Cable(
|
||||
profile=CableProfileChoices.SINGLE_1C4P,
|
||||
a_terminations=[self.interfaces[0]],
|
||||
b_terminations=[self.interfaces[4]],
|
||||
)
|
||||
cable.clean()
|
||||
cable.save()
|
||||
|
||||
self.interfaces[0].refresh_from_db()
|
||||
profile = Single1C4PCableProfile()
|
||||
|
||||
# Query all 4 positions from the A-side termination
|
||||
pairs = [(self.interfaces[0], pos) for pos in range(1, 5)]
|
||||
self._assert_batch_matches_singular(profile, pairs)
|
||||
|
||||
def test_multi_connector_multi_position(self):
|
||||
"""
|
||||
Batch resolution on a Trunk 2C2P profile across both connectors
|
||||
should match individual lookups.
|
||||
"""
|
||||
cable = Cable(
|
||||
profile=CableProfileChoices.TRUNK_2C2P,
|
||||
a_terminations=[self.interfaces[0], self.interfaces[1]],
|
||||
b_terminations=[self.interfaces[2], self.interfaces[3]],
|
||||
)
|
||||
cable.clean()
|
||||
cable.save()
|
||||
|
||||
for iface in self.interfaces[:4]:
|
||||
iface.refresh_from_db()
|
||||
profile = Trunk2C2PCableProfile()
|
||||
|
||||
# Build pairs for both A-side terminations across their positions
|
||||
pairs = []
|
||||
for iface in self.interfaces[:2]:
|
||||
for pos in iface.cable_positions:
|
||||
pairs.append((iface, pos))
|
||||
|
||||
self._assert_batch_matches_singular(profile, pairs)
|
||||
|
||||
def test_shuffle_profile_mapping(self):
|
||||
"""
|
||||
Batch resolution on a shuffle profile should correctly apply the
|
||||
non-linear position mapping.
|
||||
"""
|
||||
cable = Cable(
|
||||
profile=CableProfileChoices.TRUNK_2C4P_SHUFFLE,
|
||||
a_terminations=[self.interfaces[0], self.interfaces[1]],
|
||||
b_terminations=[self.interfaces[2], self.interfaces[3]],
|
||||
)
|
||||
cable.clean()
|
||||
cable.save()
|
||||
|
||||
for iface in self.interfaces[:4]:
|
||||
iface.refresh_from_db()
|
||||
profile = Trunk2C4PShuffleCableProfile()
|
||||
|
||||
pairs = []
|
||||
for iface in self.interfaces[:2]:
|
||||
for pos in iface.cable_positions:
|
||||
pairs.append((iface, pos))
|
||||
|
||||
self._assert_batch_matches_singular(profile, pairs)
|
||||
|
||||
def test_breakout_profile(self):
|
||||
"""
|
||||
Batch resolution on a breakout profile should correctly map A-side
|
||||
positions to different B-side connectors.
|
||||
"""
|
||||
cable = Cable(
|
||||
profile=CableProfileChoices.BREAKOUT_1C4P_4C1P,
|
||||
a_terminations=[self.interfaces[8]],
|
||||
b_terminations=self.interfaces[9:13],
|
||||
)
|
||||
cable.clean()
|
||||
cable.save()
|
||||
|
||||
self.interfaces[8].refresh_from_db()
|
||||
for iface in self.interfaces[9:13]:
|
||||
iface.refresh_from_db()
|
||||
profile = Breakout1C4Px4C1PCableProfile()
|
||||
|
||||
# Test A→B direction (one connector, 4 positions → 4 connectors)
|
||||
a_pairs = [(self.interfaces[8], pos) for pos in self.interfaces[8].cable_positions]
|
||||
self._assert_batch_matches_singular(profile, a_pairs)
|
||||
|
||||
# Test B→A direction (4 connectors, 1 position each → one connector)
|
||||
b_pairs = [(iface, 1) for iface in self.interfaces[9:13]]
|
||||
self._assert_batch_matches_singular(profile, b_pairs)
|
||||
|
||||
def test_multi_position_single_termination(self):
|
||||
"""
|
||||
When a single-connector multi-position profile has only one termination
|
||||
per side, all positions should resolve to the same peer object. The batch
|
||||
method must return identical results to the singular method for each.
|
||||
"""
|
||||
# Use a multi-position profile but only connect one termination per side.
|
||||
# The CableTermination will have positions=[1,2,3,4] but only one object
|
||||
# is attached, so querying any position still resolves to that same peer.
|
||||
cable = Cable(
|
||||
profile=CableProfileChoices.SINGLE_1C4P,
|
||||
a_terminations=[self.interfaces[0]],
|
||||
b_terminations=[self.interfaces[1]],
|
||||
)
|
||||
cable.clean()
|
||||
cable.save()
|
||||
|
||||
self.interfaces[0].refresh_from_db()
|
||||
profile = Single1C4PCableProfile()
|
||||
|
||||
# All 4 positions should resolve to the same B-side termination
|
||||
pairs = [(self.interfaces[0], pos) for pos in range(1, 5)]
|
||||
self._assert_batch_matches_singular(profile, pairs)
|
||||
|
||||
def test_duplicate_pairs(self):
|
||||
"""
|
||||
Submitting the same (termination, position) pair multiple times should
|
||||
return the correct result for each occurrence without errors.
|
||||
"""
|
||||
cable = Cable(
|
||||
profile=CableProfileChoices.SINGLE_1C1P,
|
||||
a_terminations=[self.interfaces[0]],
|
||||
b_terminations=[self.interfaces[1]],
|
||||
)
|
||||
cable.clean()
|
||||
cable.save()
|
||||
|
||||
self.interfaces[0].refresh_from_db()
|
||||
profile = Single1C1PCableProfile()
|
||||
|
||||
# The same pair submitted three times
|
||||
pairs = [(self.interfaces[0], 1)] * 3
|
||||
self._assert_batch_matches_singular(profile, pairs)
|
||||
@@ -14,6 +14,7 @@ class CablePathTests(CablePathTestCase):
|
||||
Tests are numbered as follows:
|
||||
1XX: Test direct connections using each profile
|
||||
2XX: Topology tests replicated from the legacy test case and adapted to use profiles
|
||||
3XX: Dynamic port mapping and termination changes
|
||||
"""
|
||||
|
||||
def test_101_cable_profile_single_1c1p(self):
|
||||
@@ -1223,6 +1224,98 @@ class CablePathTests(CablePathTestCase):
|
||||
# Test SVG generation
|
||||
CableTraceSVG(interfaces[0]).render()
|
||||
|
||||
def test_110_partial_termination_profiled_trunk(self):
|
||||
"""
|
||||
Tests that tracing through a partially terminated profiled cable
|
||||
produces a complete path for the connected pair and an incomplete
|
||||
path for the unconnected pair, without errors. Also verifies that
|
||||
attaching the missing termination completes the previously incomplete path.
|
||||
|
||||
[IF1] --C1-- [IF3]
|
||||
[IF2] (empty)
|
||||
|
||||
Cable profile: Trunk 2C1P with only one B-side termination.
|
||||
"""
|
||||
interfaces = [
|
||||
Interface.objects.create(device=self.device, name='Interface 1'),
|
||||
Interface.objects.create(device=self.device, name='Interface 2'),
|
||||
Interface.objects.create(device=self.device, name='Interface 3'),
|
||||
]
|
||||
|
||||
# Create a 2-connector trunk cable with both A-side connectors
|
||||
# populated but only one B-side connector terminated.
|
||||
cable1 = Cable(
|
||||
profile=CableProfileChoices.TRUNK_2C1P,
|
||||
a_terminations=[interfaces[0], interfaces[1]],
|
||||
b_terminations=[interfaces[2]],
|
||||
)
|
||||
cable1.clean()
|
||||
cable1.save()
|
||||
|
||||
# IF1 (connector 1) → IF3 (connector 1): complete path
|
||||
path1 = self.assertPathExists(
|
||||
(interfaces[0], cable1, interfaces[2]),
|
||||
is_complete=True,
|
||||
is_active=True,
|
||||
)
|
||||
|
||||
# IF3 (connector 1) → IF1 (connector 1): complete path (reverse)
|
||||
path2 = self.assertPathExists(
|
||||
(interfaces[2], cable1, interfaces[0]),
|
||||
is_complete=True,
|
||||
is_active=True,
|
||||
)
|
||||
|
||||
# IF2 (connector 2) has no B-side peer.
|
||||
# Tracing should stop at this segment, and the resulting path
|
||||
# should remain incomplete.
|
||||
# Verify via the origin's _path reference rather than matching
|
||||
# the exact path shape directly.
|
||||
interfaces[1].refresh_from_db()
|
||||
self.assertIsNotNone(interfaces[1]._path_id)
|
||||
path3 = CablePath.objects.get(pk=interfaces[1]._path_id)
|
||||
self.assertFalse(path3.is_complete)
|
||||
self.assertTrue(path3.is_active)
|
||||
|
||||
for iface in interfaces:
|
||||
iface.refresh_from_db()
|
||||
self.assertPathIsSet(interfaces[0], path1)
|
||||
self.assertPathIsSet(interfaces[2], path2)
|
||||
self.assertPathIsSet(interfaces[1], path3)
|
||||
|
||||
# Verify connector/position assignments
|
||||
self.assertEqual(interfaces[0].cable_connector, 1)
|
||||
self.assertEqual(interfaces[0].cable_positions, [1])
|
||||
self.assertEqual(interfaces[1].cable_connector, 2)
|
||||
self.assertEqual(interfaces[1].cable_positions, [1])
|
||||
self.assertEqual(interfaces[2].cable_connector, 1)
|
||||
self.assertEqual(interfaces[2].cable_positions, [1])
|
||||
|
||||
# Now attach the missing B-side termination and verify the
|
||||
# previously incomplete path becomes complete.
|
||||
interface4 = Interface.objects.create(device=self.device, name='Interface 4')
|
||||
cable1.b_terminations = [interfaces[2], interface4]
|
||||
cable1.clean()
|
||||
cable1.save()
|
||||
|
||||
path4 = self.assertPathExists(
|
||||
(interfaces[1], cable1, interface4),
|
||||
is_complete=True,
|
||||
is_active=True,
|
||||
)
|
||||
path5 = self.assertPathExists(
|
||||
(interface4, cable1, interfaces[1]),
|
||||
is_complete=True,
|
||||
is_active=True,
|
||||
)
|
||||
|
||||
interfaces[1].refresh_from_db()
|
||||
interface4.refresh_from_db()
|
||||
self.assertPathIsSet(interfaces[1], path4)
|
||||
self.assertPathIsSet(interface4, path5)
|
||||
self.assertEqual(interface4.cable_connector, 2)
|
||||
self.assertEqual(interface4.cable_positions, [1])
|
||||
|
||||
def test_202_single_path_via_pass_through_with_breakouts(self):
|
||||
"""
|
||||
[IF1] --C1-- [FP1] [RP1] --C2-- [IF3]
|
||||
|
||||
Reference in New Issue
Block a user