Compare commits

..

1 Commits

Author SHA1 Message Date
Martin Hauser
f9f64a5f29 perf(dcim): Batch peer termination lookups in Cable Path Tracing
Add `get_peer_terminations()` to resolve multiple cable terminations in
a single query, reducing N+1 queries during path tracing. Update path
resolution to use batched lookup and deduplicate peers by identity.

Fixes #21688
2026-04-15 19:35:06 +02:00
4 changed files with 419 additions and 11 deletions

View File

@@ -1,9 +1,14 @@
from collections import defaultdict, namedtuple
from django.core.exceptions import ValidationError
from django.db.models import Q
from django.utils.translation import gettext_lazy as _
from dcim.choices import CableEndChoices
from dcim.models import CableTermination
PeerLookupKey = namedtuple('PeerLookupKey', ['cable_id', 'cable_end', 'connector', 'position'])
class BaseCableProfile:
"""Base class for representing a cable profile."""
@@ -55,9 +60,14 @@ class BaseCableProfile:
return self._mapping.get((connector, position))
return connector, position
def get_peer_termination(self, termination, position):
#
# Peer termination resolution
#
def _get_peer_lookup_key(self, termination, position):
"""
Given a terminating object, return the peer terminating object (if any) on the opposite end of the cable.
Given a cabled object and a local position, return the (cable_id, cable_end, connector, position)
tuple identifying the corresponding peer CableTermination on the opposite end of the cable.
"""
try:
connector, position = self.get_mapped_position(
@@ -70,10 +80,17 @@ class BaseCableProfile:
f"Could not map connector {termination.cable_connector} position {position} on side "
f"{termination.cable_end}"
)
return PeerLookupKey(termination.cable_id, termination.opposite_cable_end, connector, position)
def get_peer_termination(self, termination, position):
"""
Given a terminating object, return the peer terminating object (if any) on the opposite end of the cable.
"""
cable_id, cable_end, connector, position = self._get_peer_lookup_key(termination, position)
try:
ct = CableTermination.objects.get(
cable=termination.cable,
cable_end=termination.opposite_cable_end,
cable_id=cable_id,
cable_end=cable_end,
connector=connector,
positions__contains=[position],
)
@@ -81,6 +98,81 @@ class BaseCableProfile:
except CableTermination.DoesNotExist:
return None, None
def get_peer_terminations(self, term_position_pairs):
"""
Resolve a batch of (termination, position) pairs to peer terminations.
"""
if not term_position_pairs:
return []
# Fast path: a single pair doesn't benefit from batching
if len(term_position_pairs) == 1:
return [self.get_peer_termination(*term_position_pairs[0])]
lookup_keys = [
self._get_peer_lookup_key(termination, position) for termination, position in term_position_pairs
]
# Group requested positions by (cable_id, cable_end, connector) so we can
# build one overlap clause per group instead of one clause per position.
positions_by_group = defaultdict(set)
for key in lookup_keys:
positions_by_group[(key.cable_id, key.cable_end, key.connector)].add(key.position)
q_filter = Q()
for (cable_id, cable_end, connector), positions in positions_by_group.items():
q_filter |= Q(
cable_id=cable_id,
cable_end=cable_end,
connector=connector,
positions__overlap=list(positions),
)
peer_ct_by_key = {}
term_ids_by_type = defaultdict(set)
model_by_type = {}
for ct in CableTermination.objects.filter(q_filter).select_related('termination_type'):
model_by_type[ct.termination_type_id] = ct.termination_type.model_class()
term_ids_by_type[ct.termination_type_id].add(ct.termination_id)
group_key = (ct.cable_id, ct.cable_end, ct.connector)
requested_positions = positions_by_group.get(group_key, ())
# The overlap query may return a termination carrying additional
# positions, so only index the positions we actually requested.
for pos in ct.positions or []:
if pos not in requested_positions:
continue
key = PeerLookupKey(ct.cable_id, ct.cable_end, ct.connector, pos)
if key in peer_ct_by_key and peer_ct_by_key[key].pk != ct.pk:
raise CableTermination.MultipleObjectsReturned(
f"Multiple peer terminations found for cable {ct.cable_id} end {ct.cable_end} "
f"connector {ct.connector} position {pos}"
)
peer_ct_by_key[key] = ct
# Use _base_manager to ensure all termination objects are loaded
# regardless of any custom default manager filters (e.g. soft-delete).
# Note: Django's GenericForeignKey / ContentType.get_object_for_this_type()
# uses _default_manager, but _base_manager is safer for bulk resolution
# during path tracing.
term_by_type = {
type_id: model_by_type[type_id]._base_manager.in_bulk(ids) for type_id, ids in term_ids_by_type.items()
}
results = []
for key in lookup_keys:
ct = peer_ct_by_key.get(key)
if ct is None:
results.append((None, None))
else:
termination = term_by_type[ct.termination_type_id].get(ct.termination_id)
results.append((termination, key.position if termination is not None else None))
return results
@staticmethod
def get_position_list(n):
"""Return a list of integers from 1 to n, inclusive."""

View File

@@ -1,5 +1,6 @@
import itertools
import logging
from collections import Counter
from django.contrib.contenttypes.fields import GenericForeignKey
from django.contrib.contenttypes.models import ContentType
@@ -907,25 +908,37 @@ class CablePath(models.Model):
# Build (termination, position) pairs by matching stacked positions
# to each termination's cable_positions. This correctly handles
# multiple terminations on different connectors of the same cable.
remaining = list(positions)
remaining = Counter(positions)
term_position_pairs = []
for term in terminations:
if term.cable_positions:
for cp in term.cable_positions:
if cp in remaining:
if remaining[cp]:
term_position_pairs.append((term, cp))
remaining.remove(cp)
remaining[cp] -= 1
# Fallback for when positions don't match cable_positions
# (e.g., empty position stack yielding [None])
if not term_position_pairs:
term_position_pairs = [(terminations[0], pos) for pos in positions]
for term, pos in term_position_pairs:
peer, new_pos = cable_profile.get_peer_termination(term, pos)
if peer not in remote_terminations:
peer_results = cable_profile.get_peer_terminations(term_position_pairs)
seen = set()
for peer, new_pos in peer_results:
# Deduplicate peer terminations by model type & PK.
key = None if peer is None else (peer._meta.concrete_model, peer.pk)
if key not in seen:
seen.add(key)
remote_terminations.append(peer)
new_positions.append(new_pos)
# If all peers resolved to None (no far-end terminations exist),
# treat as an empty result so the path is recorded as incomplete
# rather than falling through to the endpoint check with a stale
# None entry.
if remote_terminations and all(peer is None for peer in remote_terminations):
remote_terminations = []
position_stack.append(new_positions)
# Legacy (positionless) behavior
@@ -945,7 +958,9 @@ class CablePath(models.Model):
if not q_filter:
break
remote_cable_terminations = CableTermination.objects.filter(q_filter)
remote_cable_terminations = CableTermination.objects.filter(q_filter).prefetch_related(
'termination'
)
remote_terminations = [ct.termination for ct in remote_cable_terminations]
else:
# WirelessLink

View File

@@ -0,0 +1,208 @@
from dcim.cable_profiles import (
Breakout1C4Px4C1PCableProfile,
Single1C1PCableProfile,
Single1C4PCableProfile,
Trunk2C2PCableProfile,
Trunk2C4PShuffleCableProfile,
)
from dcim.choices import CableProfileChoices
from dcim.models import Cable, Interface
from dcim.tests.utils import CablePathTestCase
class CableProfilePeerTerminationTests(CablePathTestCase):
"""
Tests for BaseCableProfile.get_peer_termination() and get_peer_terminations().
Verifies that the batch method produces identical results to calling
the singular method in a loop.
"""
@classmethod
def setUpTestData(cls):
super().setUpTestData()
# Shared pool of interfaces — cables are created per-test since they
# mutate CableTerminations and cached fields on save.
cls.interfaces = [Interface(device=cls.device, name=f'Interface {i}') for i in range(1, 17)]
Interface.objects.bulk_create(cls.interfaces)
def _assert_batch_matches_singular(self, cable_profile, term_position_pairs):
"""
Helper: assert get_peer_terminations() returns the same results as
calling get_peer_termination() individually for each pair.
"""
expected = [cable_profile.get_peer_termination(term, pos) for term, pos in term_position_pairs]
actual = cable_profile.get_peer_terminations(term_position_pairs)
self.assertEqual(len(actual), len(expected))
for i, (exp, act) in enumerate(zip(expected, actual)):
exp_peer, exp_pos = exp
act_peer, act_pos = act
self.assertEqual(
(type(act_peer), getattr(act_peer, 'pk', None), act_pos),
(type(exp_peer), getattr(exp_peer, 'pk', None), exp_pos),
msg=f'Mismatch at index {i}: expected {exp}, got {act}',
)
def test_empty_pairs(self):
"""
get_peer_terminations() with an empty list returns an empty list.
"""
profile = Single1C1PCableProfile()
self.assertEqual(profile.get_peer_terminations([]), [])
def test_single_pair_fast_path(self):
"""
A single-pair call should use the fast path and produce the same
result as get_peer_termination().
"""
cable = Cable(
profile=CableProfileChoices.SINGLE_1C1P,
a_terminations=[self.interfaces[0]],
b_terminations=[self.interfaces[1]],
)
cable.clean()
cable.save()
self.interfaces[0].refresh_from_db()
profile = Single1C1PCableProfile()
self._assert_batch_matches_singular(profile, [(self.interfaces[0], 1)])
def test_single_connector_multi_position(self):
"""
Batch resolution on a Single 1C4P profile should return the same
peers as individual lookups for each position.
"""
cable = Cable(
profile=CableProfileChoices.SINGLE_1C4P,
a_terminations=[self.interfaces[0]],
b_terminations=[self.interfaces[4]],
)
cable.clean()
cable.save()
self.interfaces[0].refresh_from_db()
profile = Single1C4PCableProfile()
# Query all 4 positions from the A-side termination
pairs = [(self.interfaces[0], pos) for pos in range(1, 5)]
self._assert_batch_matches_singular(profile, pairs)
def test_multi_connector_multi_position(self):
"""
Batch resolution on a Trunk 2C2P profile across both connectors
should match individual lookups.
"""
cable = Cable(
profile=CableProfileChoices.TRUNK_2C2P,
a_terminations=[self.interfaces[0], self.interfaces[1]],
b_terminations=[self.interfaces[2], self.interfaces[3]],
)
cable.clean()
cable.save()
for iface in self.interfaces[:4]:
iface.refresh_from_db()
profile = Trunk2C2PCableProfile()
# Build pairs for both A-side terminations across their positions
pairs = []
for iface in self.interfaces[:2]:
for pos in iface.cable_positions:
pairs.append((iface, pos))
self._assert_batch_matches_singular(profile, pairs)
def test_shuffle_profile_mapping(self):
"""
Batch resolution on a shuffle profile should correctly apply the
non-linear position mapping.
"""
cable = Cable(
profile=CableProfileChoices.TRUNK_2C4P_SHUFFLE,
a_terminations=[self.interfaces[0], self.interfaces[1]],
b_terminations=[self.interfaces[2], self.interfaces[3]],
)
cable.clean()
cable.save()
for iface in self.interfaces[:4]:
iface.refresh_from_db()
profile = Trunk2C4PShuffleCableProfile()
pairs = []
for iface in self.interfaces[:2]:
for pos in iface.cable_positions:
pairs.append((iface, pos))
self._assert_batch_matches_singular(profile, pairs)
def test_breakout_profile(self):
"""
Batch resolution on a breakout profile should correctly map A-side
positions to different B-side connectors.
"""
cable = Cable(
profile=CableProfileChoices.BREAKOUT_1C4P_4C1P,
a_terminations=[self.interfaces[8]],
b_terminations=self.interfaces[9:13],
)
cable.clean()
cable.save()
self.interfaces[8].refresh_from_db()
for iface in self.interfaces[9:13]:
iface.refresh_from_db()
profile = Breakout1C4Px4C1PCableProfile()
# Test A→B direction (one connector, 4 positions → 4 connectors)
a_pairs = [(self.interfaces[8], pos) for pos in self.interfaces[8].cable_positions]
self._assert_batch_matches_singular(profile, a_pairs)
# Test B→A direction (4 connectors, 1 position each → one connector)
b_pairs = [(iface, 1) for iface in self.interfaces[9:13]]
self._assert_batch_matches_singular(profile, b_pairs)
def test_multi_position_single_termination(self):
"""
When a single-connector multi-position profile has only one termination
per side, all positions should resolve to the same peer object. The batch
method must return identical results to the singular method for each.
"""
# Use a multi-position profile but only connect one termination per side.
# The CableTermination will have positions=[1,2,3,4] but only one object
# is attached, so querying any position still resolves to that same peer.
cable = Cable(
profile=CableProfileChoices.SINGLE_1C4P,
a_terminations=[self.interfaces[0]],
b_terminations=[self.interfaces[1]],
)
cable.clean()
cable.save()
self.interfaces[0].refresh_from_db()
profile = Single1C4PCableProfile()
# All 4 positions should resolve to the same B-side termination
pairs = [(self.interfaces[0], pos) for pos in range(1, 5)]
self._assert_batch_matches_singular(profile, pairs)
def test_duplicate_pairs(self):
"""
Submitting the same (termination, position) pair multiple times should
return the correct result for each occurrence without errors.
"""
cable = Cable(
profile=CableProfileChoices.SINGLE_1C1P,
a_terminations=[self.interfaces[0]],
b_terminations=[self.interfaces[1]],
)
cable.clean()
cable.save()
self.interfaces[0].refresh_from_db()
profile = Single1C1PCableProfile()
# The same pair submitted three times
pairs = [(self.interfaces[0], 1)] * 3
self._assert_batch_matches_singular(profile, pairs)

View File

@@ -14,6 +14,7 @@ class CablePathTests(CablePathTestCase):
Tests are numbered as follows:
1XX: Test direct connections using each profile
2XX: Topology tests replicated from the legacy test case and adapted to use profiles
3XX: Dynamic port mapping and termination changes
"""
def test_101_cable_profile_single_1c1p(self):
@@ -1223,6 +1224,98 @@ class CablePathTests(CablePathTestCase):
# Test SVG generation
CableTraceSVG(interfaces[0]).render()
def test_110_partial_termination_profiled_trunk(self):
"""
Tests that tracing through a partially terminated profiled cable
produces a complete path for the connected pair and an incomplete
path for the unconnected pair, without errors. Also verifies that
attaching the missing termination completes the previously incomplete path.
[IF1] --C1-- [IF3]
[IF2] (empty)
Cable profile: Trunk 2C1P with only one B-side termination.
"""
interfaces = [
Interface.objects.create(device=self.device, name='Interface 1'),
Interface.objects.create(device=self.device, name='Interface 2'),
Interface.objects.create(device=self.device, name='Interface 3'),
]
# Create a 2-connector trunk cable with both A-side connectors
# populated but only one B-side connector terminated.
cable1 = Cable(
profile=CableProfileChoices.TRUNK_2C1P,
a_terminations=[interfaces[0], interfaces[1]],
b_terminations=[interfaces[2]],
)
cable1.clean()
cable1.save()
# IF1 (connector 1) → IF3 (connector 1): complete path
path1 = self.assertPathExists(
(interfaces[0], cable1, interfaces[2]),
is_complete=True,
is_active=True,
)
# IF3 (connector 1) → IF1 (connector 1): complete path (reverse)
path2 = self.assertPathExists(
(interfaces[2], cable1, interfaces[0]),
is_complete=True,
is_active=True,
)
# IF2 (connector 2) has no B-side peer.
# Tracing should stop at this segment, and the resulting path
# should remain incomplete.
# Verify via the origin's _path reference rather than matching
# the exact path shape directly.
interfaces[1].refresh_from_db()
self.assertIsNotNone(interfaces[1]._path_id)
path3 = CablePath.objects.get(pk=interfaces[1]._path_id)
self.assertFalse(path3.is_complete)
self.assertTrue(path3.is_active)
for iface in interfaces:
iface.refresh_from_db()
self.assertPathIsSet(interfaces[0], path1)
self.assertPathIsSet(interfaces[2], path2)
self.assertPathIsSet(interfaces[1], path3)
# Verify connector/position assignments
self.assertEqual(interfaces[0].cable_connector, 1)
self.assertEqual(interfaces[0].cable_positions, [1])
self.assertEqual(interfaces[1].cable_connector, 2)
self.assertEqual(interfaces[1].cable_positions, [1])
self.assertEqual(interfaces[2].cable_connector, 1)
self.assertEqual(interfaces[2].cable_positions, [1])
# Now attach the missing B-side termination and verify the
# previously incomplete path becomes complete.
interface4 = Interface.objects.create(device=self.device, name='Interface 4')
cable1.b_terminations = [interfaces[2], interface4]
cable1.clean()
cable1.save()
path4 = self.assertPathExists(
(interfaces[1], cable1, interface4),
is_complete=True,
is_active=True,
)
path5 = self.assertPathExists(
(interface4, cable1, interfaces[1]),
is_complete=True,
is_active=True,
)
interfaces[1].refresh_from_db()
interface4.refresh_from_db()
self.assertPathIsSet(interfaces[1], path4)
self.assertPathIsSet(interface4, path5)
self.assertEqual(interface4.cable_connector, 2)
self.assertEqual(interface4.cable_positions, [1])
def test_202_single_path_via_pass_through_with_breakouts(self):
"""
[IF1] --C1-- [FP1] [RP1] --C2-- [IF3]