Closes #20564: Many-to-many pass-through port mappings (#20851)

This commit is contained in:
Jeremy Stretch
2025-12-09 12:17:17 -05:00
committed by GitHub
parent 97d0a16fd4
commit 17d8f78ae3
35 changed files with 2512 additions and 941 deletions

View File

@@ -1,4 +1,5 @@
import itertools
import logging
from django.contrib.contenttypes.fields import GenericForeignKey
from django.contrib.contenttypes.models import ContentType
@@ -22,7 +23,7 @@ from utilities.fields import ColorField, GenericArrayForeignKey
from utilities.querysets import RestrictedQuerySet
from utilities.serialization import deserialize_object, serialize_object
from wireless.models import WirelessLink
from .device_components import FrontPort, PathEndpoint, RearPort
from .device_components import FrontPort, PathEndpoint, PortMapping, RearPort
__all__ = (
'Cable',
@@ -30,6 +31,8 @@ __all__ = (
'CableTermination',
)
logger = logging.getLogger(f'netbox.{__name__}')
trace_paths = Signal()
@@ -666,7 +669,13 @@ class CablePath(models.Model):
is_active = True
is_split = False
logger.debug(f'Tracing cable path from {terminations}...')
segment = 0
while terminations:
segment += 1
logger.debug(f'[Path segment #{segment}] Position stack: {position_stack}')
logger.debug(f'[Path segment #{segment}] Local terminations: {terminations}')
# Terminations must all be of the same type
if not all(isinstance(t, type(terminations[0])) for t in terminations[1:]):
@@ -697,7 +706,10 @@ class CablePath(models.Model):
position_stack.append([terminations[0].cable_position])
# Step 2: Determine the attached links (Cable or WirelessLink), if any
links = [termination.link for termination in terminations if termination.link is not None]
links = list(dict.fromkeys(
termination.link for termination in terminations if termination.link is not None
))
logger.debug(f'[Path segment #{segment}] Links: {links}')
if len(links) == 0:
if len(path) == 1:
# If this is the start of the path and no link exists, return None
@@ -760,10 +772,13 @@ class CablePath(models.Model):
link.interface_b if link.interface_a is terminations[0] else link.interface_a for link in links
]
logger.debug(f'[Path segment #{segment}] Remote terminations: {remote_terminations}')
# Remote Terminations must all be of the same type, otherwise return a split path
if not all(isinstance(t, type(remote_terminations[0])) for t in remote_terminations[1:]):
is_complete = False
is_split = True
logger.debug('Remote termination types differ; aborting trace.')
break
# Step 7: Record the far-end termination object(s)
@@ -777,58 +792,53 @@ class CablePath(models.Model):
if isinstance(remote_terminations[0], FrontPort):
# Follow FrontPorts to their corresponding RearPorts
rear_ports = RearPort.objects.filter(
pk__in=[t.rear_port_id for t in remote_terminations]
)
if len(rear_ports) > 1 or rear_ports[0].positions > 1:
position_stack.append([fp.rear_port_position for fp in remote_terminations])
terminations = rear_ports
elif isinstance(remote_terminations[0], RearPort):
if len(remote_terminations) == 1 and remote_terminations[0].positions == 1:
front_ports = FrontPort.objects.filter(
rear_port_id__in=[rp.pk for rp in remote_terminations],
rear_port_position=1
)
# Obtain the individual front ports based on the termination and all positions
elif len(remote_terminations) > 1 and position_stack:
if remote_terminations[0].positions > 1 and position_stack:
positions = position_stack.pop()
# Ensure we have a number of positions equal to the amount of remote terminations
if len(remote_terminations) != len(positions):
raise UnsupportedCablePath(
_("All positions counts within the path on opposite ends of links must match")
)
# Get our front ports
q_filter = Q()
for rt in remote_terminations:
position = positions.pop()
q_filter |= Q(rear_port_id=rt.pk, rear_port_position=position)
if q_filter is Q():
raise UnsupportedCablePath(_("Remote termination position filter is missing"))
front_ports = FrontPort.objects.filter(q_filter)
# Obtain the individual front ports based on the termination and position
elif position_stack:
front_ports = FrontPort.objects.filter(
rear_port_id=remote_terminations[0].pk,
rear_port_position__in=position_stack.pop()
)
# If all rear ports have a single position, we can just get the front ports
elif all([rp.positions == 1 for rp in remote_terminations]):
front_ports = FrontPort.objects.filter(rear_port_id__in=[rp.pk for rp in remote_terminations])
if len(front_ports) != len(remote_terminations):
# Some rear ports does not have a front port
is_split = True
break
else:
# No position indicated: path has split, so we stop at the RearPorts
q_filter |= Q(front_port=rt, front_port_position__in=positions)
port_mappings = PortMapping.objects.filter(q_filter)
elif remote_terminations[0].positions > 1:
is_split = True
logger.debug(
'Encountered front port mapped to multiple rear ports but position stack is empty; aborting '
'trace.'
)
break
else:
port_mappings = PortMapping.objects.filter(front_port__in=remote_terminations)
if not port_mappings:
break
terminations = front_ports
# Compile the list of RearPorts without duplication or altering their ordering
terminations = list(dict.fromkeys(mapping.rear_port for mapping in port_mappings))
if any(t.positions > 1 for t in terminations):
position_stack.append([mapping.rear_port_position for mapping in port_mappings])
elif isinstance(remote_terminations[0], RearPort):
# Follow RearPorts to their corresponding FrontPorts
if remote_terminations[0].positions > 1 and position_stack:
positions = position_stack.pop()
q_filter = Q()
for rt in remote_terminations:
q_filter |= Q(rear_port=rt, rear_port_position__in=positions)
port_mappings = PortMapping.objects.filter(q_filter)
elif remote_terminations[0].positions > 1:
is_split = True
logger.debug(
'Encountered rear port mapped to multiple front ports but position stack is empty; aborting '
'trace.'
)
break
else:
port_mappings = PortMapping.objects.filter(rear_port__in=remote_terminations)
if not port_mappings:
break
# Compile the list of FrontPorts without duplication or altering their ordering
terminations = list(dict.fromkeys(mapping.front_port for mapping in port_mappings))
if any(t.positions > 1 for t in terminations):
position_stack.append([mapping.front_port_position for mapping in port_mappings])
elif isinstance(remote_terminations[0], CircuitTermination):
# Follow a CircuitTermination to its corresponding CircuitTermination (A to Z or vice versa)
@@ -876,6 +886,7 @@ class CablePath(models.Model):
# Unsupported topology, mark as split and exit
is_complete = False
is_split = True
logger.warning('Encountered an unsupported topology; aborting trace.')
break
return cls(
@@ -954,16 +965,23 @@ class CablePath(models.Model):
# RearPort splitting to multiple FrontPorts with no stack position
if type(nodes[0]) is RearPort:
return FrontPort.objects.filter(rear_port__in=nodes)
return [
mapping.front_port for mapping in
PortMapping.objects.filter(rear_port__in=nodes).prefetch_related('front_port')
]
# Cable terminating to multiple FrontPorts mapped to different
# RearPorts connected to different cables
elif type(nodes[0]) is FrontPort:
return RearPort.objects.filter(pk__in=[fp.rear_port_id for fp in nodes])
if type(nodes[0]) is FrontPort:
return [
mapping.rear_port for mapping in
PortMapping.objects.filter(front_port__in=nodes).prefetch_related('rear_port')
]
# Cable terminating to multiple CircuitTerminations
elif type(nodes[0]) is CircuitTermination:
if type(nodes[0]) is CircuitTermination:
return [
ct.get_peer_termination() for ct in nodes
]
return []
def get_asymmetric_nodes(self):
"""