Fixes #21949: Fix recursive power utilization calculation (#21997)

This commit is contained in:
Jeremy Stretch
2026-04-27 08:35:45 -04:00
committed by GitHub
parent 2fd6924d26
commit c3c26332ad
2 changed files with 213 additions and 13 deletions

View File

@@ -5,7 +5,6 @@ from django.contrib.postgres.fields import ArrayField
from django.core.exceptions import ObjectDoesNotExist, ValidationError
from django.core.validators import MaxValueValidator, MinValueValidator
from django.db import models
from django.db.models import Sum
from django.utils.translation import gettext_lazy as _
from mptt.models import MPTTModel, TreeForeignKey
@@ -523,7 +522,7 @@ class PowerPort(ModularComponentModel, CabledObjectModel, PathEndpoint, Tracking
return PowerPort.objects.filter(q)
def get_power_draw(self):
def get_power_draw(self, _seen=None):
"""
Return the allocated and maximum power draw (in VA) and child PowerOutlet count for this PowerPort.
"""
@@ -531,13 +530,34 @@ class PowerPort(ModularComponentModel, CabledObjectModel, PathEndpoint, Tracking
# Calculate aggregate draw of all child power outlets if no numbers have been defined manually
if self.allocated_draw is None and self.maximum_draw is None:
utilization = self.get_downstream_powerports().aggregate(
maximum_draw_total=Sum('maximum_draw'),
allocated_draw_total=Sum('allocated_draw'),
)
def _aggregate(powerports, seen):
# Recursively resolve the draw for each downstream PowerPort. Using the per-port value
# (rather than a SQL aggregate over allocated_draw/maximum_draw) allows the draw to
# propagate through intermediate auto-mode PowerPorts, e.g. PDU-internal fuse chains.
# `seen` tracks visited PowerPorts to prevent infinite recursion if the topology
# happens to form a cycle.
allocated_total = 0
maximum_total = 0
for powerport in powerports:
if powerport.pk in seen:
continue
seen.add(powerport.pk)
draw = powerport.get_power_draw(_seen=seen)
allocated_total += draw['allocated']
maximum_total += draw['maximum']
return allocated_total, maximum_total
# Seed each _aggregate() call with a fresh copy of the inherited visited set so the full
# and per-leg aggregations are independent. Otherwise, ports visited during the full
# aggregation would be skipped during the per-leg passes.
base_seen = set(_seen) if _seen else set()
base_seen.add(self.pk)
allocated, maximum = _aggregate(self.get_downstream_powerports(), set(base_seen))
ret = {
'allocated': utilization['allocated_draw_total'] or 0,
'maximum': utilization['maximum_draw_total'] or 0,
'allocated': allocated,
'maximum': maximum,
'outlet_count': self.poweroutlets.count(),
'legs': [],
}
@@ -546,14 +566,13 @@ class PowerPort(ModularComponentModel, CabledObjectModel, PathEndpoint, Tracking
if len(self.link_peers) == 1 and isinstance(self.link_peers[0], PowerFeed) and \
self.link_peers[0].phase == PowerFeedPhaseChoices.PHASE_3PHASE:
for leg, leg_name in PowerOutletFeedLegChoices:
utilization = self.get_downstream_powerports(leg=leg).aggregate(
maximum_draw_total=Sum('maximum_draw'),
allocated_draw_total=Sum('allocated_draw'),
leg_allocated, leg_maximum = _aggregate(
self.get_downstream_powerports(leg=leg), set(base_seen)
)
ret['legs'].append({
'name': leg_name,
'allocated': utilization['allocated_draw_total'] or 0,
'maximum': utilization['maximum_draw_total'] or 0,
'allocated': leg_allocated,
'maximum': leg_maximum,
'outlet_count': self.poweroutlets.filter(feed_leg=leg).count(),
})

View File

@@ -1557,3 +1557,184 @@ class SiteSignalTestCase(TestCase):
# Regression test for #21045: should not raise ValueError
site.save()
class PowerPortDrawTestCase(TestCase):
"""
Tests for PowerPort.get_power_draw() power aggregation logic.
"""
@classmethod
def setUpTestData(cls):
cls.site = Site.objects.create(name='Test Site', slug='test-site')
manufacturer = Manufacturer.objects.create(name='Generic', slug='generic')
device_type = DeviceType.objects.create(manufacturer=manufacturer, model='Test Device Type')
role = DeviceRole.objects.create(name='Test Role', slug='test-role')
cls.pdu = Device.objects.create(
device_type=device_type, role=role, site=cls.site, name='pdu'
)
cls.server = Device.objects.create(
device_type=device_type, role=role, site=cls.site, name='server'
)
def test_direct_draw_aggregation(self):
"""
Sanity check: with one PowerOutlet chained directly to a downstream PSU PowerPort,
the upstream PowerPort should reflect the PSU's allocated/maximum draw.
[main] -- [outlet] --C-- [psu]
"""
main = PowerPort.objects.create(device=self.pdu, name='main')
outlet = PowerOutlet.objects.create(device=self.pdu, name='outlet', power_port=main)
psu = PowerPort.objects.create(
device=self.server, name='psu', allocated_draw=200, maximum_draw=400
)
Cable(a_terminations=[outlet], b_terminations=[psu]).save()
draw = main.get_power_draw()
self.assertEqual(draw['allocated'], 200)
self.assertEqual(draw['maximum'], 400)
@tag('regression')
def test_recursive_draw_through_intermediate_powerport(self):
"""
Regression test for #21949: A PDU modeled with internal fuses (intermediate PowerPorts in
auto mode) should still aggregate downstream PSU draw up to the main PowerPort.
[main] -- [feedback] --C-- [fuse] -- [outlet] --C-- [psu]
Both `main` and `fuse` are in auto mode (no allocated_draw/maximum_draw set). The draw
reported by `psu` must propagate through `fuse` and be reflected at `main`.
"""
main = PowerPort.objects.create(device=self.pdu, name='main')
feedback = PowerOutlet.objects.create(device=self.pdu, name='feedback', power_port=main)
fuse = PowerPort.objects.create(device=self.pdu, name='fuse')
outlet = PowerOutlet.objects.create(device=self.pdu, name='outlet', power_port=fuse)
psu = PowerPort.objects.create(
device=self.server, name='psu', allocated_draw=150, maximum_draw=300
)
Cable(a_terminations=[feedback], b_terminations=[fuse]).save()
Cable(a_terminations=[outlet], b_terminations=[psu]).save()
fuse_draw = fuse.get_power_draw()
self.assertEqual(fuse_draw['allocated'], 150)
self.assertEqual(fuse_draw['maximum'], 300)
main_draw = main.get_power_draw()
self.assertEqual(main_draw['allocated'], 150)
self.assertEqual(main_draw['maximum'], 300)
def test_intermediate_manual_override_stops_recursion(self):
"""
When an intermediate PowerPort has an explicit allocated_draw/maximum_draw, recursion should
stop there and the administratively defined values should be used.
"""
main = PowerPort.objects.create(device=self.pdu, name='main')
feedback = PowerOutlet.objects.create(device=self.pdu, name='feedback', power_port=main)
fuse = PowerPort.objects.create(
device=self.pdu, name='fuse', allocated_draw=500, maximum_draw=1000
)
outlet = PowerOutlet.objects.create(device=self.pdu, name='outlet', power_port=fuse)
psu = PowerPort.objects.create(
device=self.server, name='psu', allocated_draw=150, maximum_draw=300
)
Cable(a_terminations=[feedback], b_terminations=[fuse]).save()
Cable(a_terminations=[outlet], b_terminations=[psu]).save()
main_draw = main.get_power_draw()
self.assertEqual(main_draw['allocated'], 500)
self.assertEqual(main_draw['maximum'], 1000)
def _connect_three_phase_feed(self, powerport):
"""
Helper: attach `powerport` via cable to a newly-created three-phase PowerFeed.
"""
power_panel = PowerPanel.objects.create(site=self.site, name='Panel')
power_feed = PowerFeed.objects.create(
power_panel=power_panel,
name='Feed',
phase=PowerFeedPhaseChoices.PHASE_3PHASE,
)
Cable(a_terminations=[powerport], b_terminations=[power_feed]).save()
@tag('regression')
def test_three_phase_per_leg_aggregation(self):
"""
Regression test: per-leg totals for a main PowerPort connected to a three-phase PowerFeed
must be populated even when the full aggregation runs first. Previously, a shared visited
set caused downstream ports to be skipped during the per-leg passes, zeroing the legs.
[main] --C-- [3-phase PowerFeed]
├── [outlet_A] (leg A) --C-- [portA] (allocated=100, maximum=200)
├── [outlet_B] (leg B) --C-- [portB] (allocated=200, maximum=400)
└── [outlet_C] (leg C) --C-- [portC] (allocated=300, maximum=600)
"""
main = PowerPort.objects.create(device=self.pdu, name='main')
self._connect_three_phase_feed(main)
leg_specs = [
(PowerOutletFeedLegChoices.FEED_LEG_A, 100, 200),
(PowerOutletFeedLegChoices.FEED_LEG_B, 200, 400),
(PowerOutletFeedLegChoices.FEED_LEG_C, 300, 600),
]
for leg, allocated, maximum in leg_specs:
outlet = PowerOutlet.objects.create(
device=self.pdu, name=f'outlet_{leg}', power_port=main, feed_leg=leg
)
port = PowerPort.objects.create(
device=self.server, name=f'psu_{leg}',
allocated_draw=allocated, maximum_draw=maximum,
)
Cable(a_terminations=[outlet], b_terminations=[port]).save()
# Re-fetch to clear cached_property values populated before cable creation
main = PowerPort.objects.get(pk=main.pk)
draw = main.get_power_draw()
self.assertEqual(draw['allocated'], 600)
self.assertEqual(draw['maximum'], 1200)
legs_by_name = {leg['name']: leg for leg in draw['legs']}
self.assertEqual(legs_by_name['A']['allocated'], 100)
self.assertEqual(legs_by_name['A']['maximum'], 200)
self.assertEqual(legs_by_name['B']['allocated'], 200)
self.assertEqual(legs_by_name['B']['maximum'], 400)
self.assertEqual(legs_by_name['C']['allocated'], 300)
self.assertEqual(legs_by_name['C']['maximum'], 600)
@tag('regression')
def test_three_phase_per_leg_recursive_aggregation(self):
"""
Regression test for #21949 on three-phase feeds: per-leg totals must aggregate through
intermediate auto-mode PowerPorts (the PDU-internal "fuse" pattern).
[main] --C-- [3-phase PowerFeed]
└── [feedback_A] (leg A) --C-- [fuse_A] (auto)
└── [outlet_A] (leg A) --C-- [psu_A] (allocated=100)
"""
main = PowerPort.objects.create(device=self.pdu, name='main')
self._connect_three_phase_feed(main)
feedback = PowerOutlet.objects.create(
device=self.pdu, name='feedback_A', power_port=main,
feed_leg=PowerOutletFeedLegChoices.FEED_LEG_A,
)
fuse = PowerPort.objects.create(device=self.pdu, name='fuse_A')
outlet = PowerOutlet.objects.create(
device=self.pdu, name='outlet_A', power_port=fuse,
feed_leg=PowerOutletFeedLegChoices.FEED_LEG_A,
)
psu = PowerPort.objects.create(
device=self.server, name='psu_A', allocated_draw=100, maximum_draw=200
)
Cable(a_terminations=[feedback], b_terminations=[fuse]).save()
Cable(a_terminations=[outlet], b_terminations=[psu]).save()
# Re-fetch to clear cached_property values populated before cable creation
main = PowerPort.objects.get(pk=main.pk)
draw = main.get_power_draw()
self.assertEqual(draw['allocated'], 100)
self.assertEqual(draw['maximum'], 200)
legs_by_name = {leg['name']: leg for leg in draw['legs']}
self.assertEqual(legs_by_name['A']['allocated'], 100)
self.assertEqual(legs_by_name['A']['maximum'], 200)
self.assertEqual(legs_by_name['B']['allocated'], 0)
self.assertEqual(legs_by_name['C']['allocated'], 0)