Compare commits

...

3 Commits

Author SHA1 Message Date
Jeremy Stretch
9a1f59dbe1 Adopt PR feedback; extend test suite 2026-04-24 13:39:49 -04:00
Jeremy Stretch
8b872a1447 Fixes #21949: Fix recursive power utilization calculation 2026-04-24 13:23:15 -04:00
github-actions
2fd6924d26 Update source translation strings 2026-04-24 05:46:33 +00:00
3 changed files with 241 additions and 41 deletions

View File

@@ -5,7 +5,6 @@ from django.contrib.postgres.fields import ArrayField
from django.core.exceptions import ObjectDoesNotExist, ValidationError
from django.core.validators import MaxValueValidator, MinValueValidator
from django.db import models
from django.db.models import Sum
from django.utils.translation import gettext_lazy as _
from mptt.models import MPTTModel, TreeForeignKey
@@ -523,7 +522,7 @@ class PowerPort(ModularComponentModel, CabledObjectModel, PathEndpoint, Tracking
return PowerPort.objects.filter(q)
def get_power_draw(self):
def get_power_draw(self, _seen=None):
"""
Return the allocated and maximum power draw (in VA) and child PowerOutlet count for this PowerPort.
"""
@@ -531,13 +530,34 @@ class PowerPort(ModularComponentModel, CabledObjectModel, PathEndpoint, Tracking
# Calculate aggregate draw of all child power outlets if no numbers have been defined manually
if self.allocated_draw is None and self.maximum_draw is None:
utilization = self.get_downstream_powerports().aggregate(
maximum_draw_total=Sum('maximum_draw'),
allocated_draw_total=Sum('allocated_draw'),
)
def _aggregate(powerports, seen):
# Recursively resolve the draw for each downstream PowerPort. Using the per-port value
# (rather than a SQL aggregate over allocated_draw/maximum_draw) allows the draw to
# propagate through intermediate auto-mode PowerPorts, e.g. PDU-internal fuse chains.
# `seen` tracks visited PowerPorts to prevent infinite recursion if the topology
# happens to form a cycle.
allocated_total = 0
maximum_total = 0
for powerport in powerports:
if powerport.pk in seen:
continue
seen.add(powerport.pk)
draw = powerport.get_power_draw(_seen=seen)
allocated_total += draw['allocated']
maximum_total += draw['maximum']
return allocated_total, maximum_total
# Seed each _aggregate() call with a fresh copy of the inherited visited set so the full
# and per-leg aggregations are independent. Otherwise, ports visited during the full
# aggregation would be skipped during the per-leg passes.
base_seen = set(_seen) if _seen else set()
base_seen.add(self.pk)
allocated, maximum = _aggregate(self.get_downstream_powerports(), set(base_seen))
ret = {
'allocated': utilization['allocated_draw_total'] or 0,
'maximum': utilization['maximum_draw_total'] or 0,
'allocated': allocated,
'maximum': maximum,
'outlet_count': self.poweroutlets.count(),
'legs': [],
}
@@ -546,14 +566,13 @@ class PowerPort(ModularComponentModel, CabledObjectModel, PathEndpoint, Tracking
if len(self.link_peers) == 1 and isinstance(self.link_peers[0], PowerFeed) and \
self.link_peers[0].phase == PowerFeedPhaseChoices.PHASE_3PHASE:
for leg, leg_name in PowerOutletFeedLegChoices:
utilization = self.get_downstream_powerports(leg=leg).aggregate(
maximum_draw_total=Sum('maximum_draw'),
allocated_draw_total=Sum('allocated_draw'),
leg_allocated, leg_maximum = _aggregate(
self.get_downstream_powerports(leg=leg), set(base_seen)
)
ret['legs'].append({
'name': leg_name,
'allocated': utilization['allocated_draw_total'] or 0,
'maximum': utilization['maximum_draw_total'] or 0,
'allocated': leg_allocated,
'maximum': leg_maximum,
'outlet_count': self.poweroutlets.filter(feed_leg=leg).count(),
})

View File

@@ -1557,3 +1557,184 @@ class SiteSignalTestCase(TestCase):
# Regression test for #21045: should not raise ValueError
site.save()
class PowerPortDrawTestCase(TestCase):
"""
Tests for PowerPort.get_power_draw() power aggregation logic.
"""
@classmethod
def setUpTestData(cls):
cls.site = Site.objects.create(name='Test Site', slug='test-site')
manufacturer = Manufacturer.objects.create(name='Generic', slug='generic')
device_type = DeviceType.objects.create(manufacturer=manufacturer, model='Test Device Type')
role = DeviceRole.objects.create(name='Test Role', slug='test-role')
cls.pdu = Device.objects.create(
device_type=device_type, role=role, site=cls.site, name='pdu'
)
cls.server = Device.objects.create(
device_type=device_type, role=role, site=cls.site, name='server'
)
def test_direct_draw_aggregation(self):
"""
Sanity check: with one PowerOutlet chained directly to a downstream PSU PowerPort,
the upstream PowerPort should reflect the PSU's allocated/maximum draw.
[main] -- [outlet] --C-- [psu]
"""
main = PowerPort.objects.create(device=self.pdu, name='main')
outlet = PowerOutlet.objects.create(device=self.pdu, name='outlet', power_port=main)
psu = PowerPort.objects.create(
device=self.server, name='psu', allocated_draw=200, maximum_draw=400
)
Cable(a_terminations=[outlet], b_terminations=[psu]).save()
draw = main.get_power_draw()
self.assertEqual(draw['allocated'], 200)
self.assertEqual(draw['maximum'], 400)
@tag('regression')
def test_recursive_draw_through_intermediate_powerport(self):
"""
Regression test for #21949: A PDU modeled with internal fuses (intermediate PowerPorts in
auto mode) should still aggregate downstream PSU draw up to the main PowerPort.
[main] -- [feedback] --C-- [fuse] -- [outlet] --C-- [psu]
Both `main` and `fuse` are in auto mode (no allocated_draw/maximum_draw set). The draw
reported by `psu` must propagate through `fuse` and be reflected at `main`.
"""
main = PowerPort.objects.create(device=self.pdu, name='main')
feedback = PowerOutlet.objects.create(device=self.pdu, name='feedback', power_port=main)
fuse = PowerPort.objects.create(device=self.pdu, name='fuse')
outlet = PowerOutlet.objects.create(device=self.pdu, name='outlet', power_port=fuse)
psu = PowerPort.objects.create(
device=self.server, name='psu', allocated_draw=150, maximum_draw=300
)
Cable(a_terminations=[feedback], b_terminations=[fuse]).save()
Cable(a_terminations=[outlet], b_terminations=[psu]).save()
fuse_draw = fuse.get_power_draw()
self.assertEqual(fuse_draw['allocated'], 150)
self.assertEqual(fuse_draw['maximum'], 300)
main_draw = main.get_power_draw()
self.assertEqual(main_draw['allocated'], 150)
self.assertEqual(main_draw['maximum'], 300)
def test_intermediate_manual_override_stops_recursion(self):
"""
When an intermediate PowerPort has an explicit allocated_draw/maximum_draw, recursion should
stop there and the administratively defined values should be used.
"""
main = PowerPort.objects.create(device=self.pdu, name='main')
feedback = PowerOutlet.objects.create(device=self.pdu, name='feedback', power_port=main)
fuse = PowerPort.objects.create(
device=self.pdu, name='fuse', allocated_draw=500, maximum_draw=1000
)
outlet = PowerOutlet.objects.create(device=self.pdu, name='outlet', power_port=fuse)
psu = PowerPort.objects.create(
device=self.server, name='psu', allocated_draw=150, maximum_draw=300
)
Cable(a_terminations=[feedback], b_terminations=[fuse]).save()
Cable(a_terminations=[outlet], b_terminations=[psu]).save()
main_draw = main.get_power_draw()
self.assertEqual(main_draw['allocated'], 500)
self.assertEqual(main_draw['maximum'], 1000)
def _connect_three_phase_feed(self, powerport):
"""
Helper: attach `powerport` via cable to a newly-created three-phase PowerFeed.
"""
power_panel = PowerPanel.objects.create(site=self.site, name='Panel')
power_feed = PowerFeed.objects.create(
power_panel=power_panel,
name='Feed',
phase=PowerFeedPhaseChoices.PHASE_3PHASE,
)
Cable(a_terminations=[powerport], b_terminations=[power_feed]).save()
@tag('regression')
def test_three_phase_per_leg_aggregation(self):
"""
Regression test: per-leg totals for a main PowerPort connected to a three-phase PowerFeed
must be populated even when the full aggregation runs first. Previously, a shared visited
set caused downstream ports to be skipped during the per-leg passes, zeroing the legs.
[main] --C-- [3-phase PowerFeed]
├── [outlet_A] (leg A) --C-- [portA] (allocated=100, maximum=200)
├── [outlet_B] (leg B) --C-- [portB] (allocated=200, maximum=400)
└── [outlet_C] (leg C) --C-- [portC] (allocated=300, maximum=600)
"""
main = PowerPort.objects.create(device=self.pdu, name='main')
self._connect_three_phase_feed(main)
leg_specs = [
(PowerOutletFeedLegChoices.FEED_LEG_A, 100, 200),
(PowerOutletFeedLegChoices.FEED_LEG_B, 200, 400),
(PowerOutletFeedLegChoices.FEED_LEG_C, 300, 600),
]
for leg, allocated, maximum in leg_specs:
outlet = PowerOutlet.objects.create(
device=self.pdu, name=f'outlet_{leg}', power_port=main, feed_leg=leg
)
port = PowerPort.objects.create(
device=self.server, name=f'psu_{leg}',
allocated_draw=allocated, maximum_draw=maximum,
)
Cable(a_terminations=[outlet], b_terminations=[port]).save()
# Re-fetch to clear cached_property values populated before cable creation
main = PowerPort.objects.get(pk=main.pk)
draw = main.get_power_draw()
self.assertEqual(draw['allocated'], 600)
self.assertEqual(draw['maximum'], 1200)
legs_by_name = {leg['name']: leg for leg in draw['legs']}
self.assertEqual(legs_by_name['A']['allocated'], 100)
self.assertEqual(legs_by_name['A']['maximum'], 200)
self.assertEqual(legs_by_name['B']['allocated'], 200)
self.assertEqual(legs_by_name['B']['maximum'], 400)
self.assertEqual(legs_by_name['C']['allocated'], 300)
self.assertEqual(legs_by_name['C']['maximum'], 600)
@tag('regression')
def test_three_phase_per_leg_recursive_aggregation(self):
"""
Regression test for #21949 on three-phase feeds: per-leg totals must aggregate through
intermediate auto-mode PowerPorts (the PDU-internal "fuse" pattern).
[main] --C-- [3-phase PowerFeed]
└── [feedback_A] (leg A) --C-- [fuse_A] (auto)
└── [outlet_A] (leg A) --C-- [psu_A] (allocated=100)
"""
main = PowerPort.objects.create(device=self.pdu, name='main')
self._connect_three_phase_feed(main)
feedback = PowerOutlet.objects.create(
device=self.pdu, name='feedback_A', power_port=main,
feed_leg=PowerOutletFeedLegChoices.FEED_LEG_A,
)
fuse = PowerPort.objects.create(device=self.pdu, name='fuse_A')
outlet = PowerOutlet.objects.create(
device=self.pdu, name='outlet_A', power_port=fuse,
feed_leg=PowerOutletFeedLegChoices.FEED_LEG_A,
)
psu = PowerPort.objects.create(
device=self.server, name='psu_A', allocated_draw=100, maximum_draw=200
)
Cable(a_terminations=[feedback], b_terminations=[fuse]).save()
Cable(a_terminations=[outlet], b_terminations=[psu]).save()
# Re-fetch to clear cached_property values populated before cable creation
main = PowerPort.objects.get(pk=main.pk)
draw = main.get_power_draw()
self.assertEqual(draw['allocated'], 100)
self.assertEqual(draw['maximum'], 200)
legs_by_name = {leg['name']: leg for leg in draw['legs']}
self.assertEqual(legs_by_name['A']['allocated'], 100)
self.assertEqual(legs_by_name['A']['maximum'], 200)
self.assertEqual(legs_by_name['B']['allocated'], 0)
self.assertEqual(legs_by_name['C']['allocated'], 0)

View File

@@ -8,7 +8,7 @@ msgid ""
msgstr ""
"Project-Id-Version: PACKAGE VERSION\n"
"Report-Msgid-Bugs-To: \n"
"POT-Creation-Date: 2026-04-23 05:43+0000\n"
"POT-Creation-Date: 2026-04-24 05:46+0000\n"
"PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n"
"Last-Translator: FULL NAME <EMAIL@ADDRESS>\n"
"Language-Team: LANGUAGE <LL@li.org>\n"
@@ -9308,109 +9308,109 @@ msgid "False"
msgstr ""
#: netbox/extras/models/customfields.py:572
#: netbox/extras/models/customfields.py:620
#: netbox/extras/models/customfields.py:625
#, python-brace-format
msgid "Values must match this regex: <code>{regex}</code>"
msgstr ""
#: netbox/extras/models/customfields.py:722
#: netbox/extras/models/customfields.py:729
#: netbox/extras/models/customfields.py:727
#: netbox/extras/models/customfields.py:734
msgid "Value must be a string."
msgstr ""
#: netbox/extras/models/customfields.py:724
#: netbox/extras/models/customfields.py:731
#: netbox/extras/models/customfields.py:729
#: netbox/extras/models/customfields.py:736
#, python-brace-format
msgid "Value must match regex '{regex}'"
msgstr ""
#: netbox/extras/models/customfields.py:736
#: netbox/extras/models/customfields.py:741
msgid "Value must be an integer."
msgstr ""
#: netbox/extras/models/customfields.py:739
#: netbox/extras/models/customfields.py:754
#: netbox/extras/models/customfields.py:744
#: netbox/extras/models/customfields.py:759
#, python-brace-format
msgid "Value must be at least {minimum}"
msgstr ""
#: netbox/extras/models/customfields.py:743
#: netbox/extras/models/customfields.py:758
#: netbox/extras/models/customfields.py:748
#: netbox/extras/models/customfields.py:763
#, python-brace-format
msgid "Value must not exceed {maximum}"
msgstr ""
#: netbox/extras/models/customfields.py:751
#: netbox/extras/models/customfields.py:756
msgid "Value must be a decimal."
msgstr ""
#: netbox/extras/models/customfields.py:763
#: netbox/extras/models/customfields.py:768
msgid "Value must be true or false."
msgstr ""
#: netbox/extras/models/customfields.py:771
#: netbox/extras/models/customfields.py:776
msgid "Date values must be in ISO 8601 format (YYYY-MM-DD)."
msgstr ""
#: netbox/extras/models/customfields.py:780
#: netbox/extras/models/customfields.py:785
msgid "Date and time values must be in ISO 8601 format (YYYY-MM-DD HH:MM:SS)."
msgstr ""
#: netbox/extras/models/customfields.py:787
#: netbox/extras/models/customfields.py:792
#, python-brace-format
msgid "Invalid choice ({value}) for choice set {choiceset}."
msgstr ""
#: netbox/extras/models/customfields.py:797
#: netbox/extras/models/customfields.py:802
#, python-brace-format
msgid "Invalid choice(s) ({value}) for choice set {choiceset}."
msgstr ""
#: netbox/extras/models/customfields.py:806
#: netbox/extras/models/customfields.py:811
#, python-brace-format
msgid "Value must be an object ID, not {type}"
msgstr ""
#: netbox/extras/models/customfields.py:812
#: netbox/extras/models/customfields.py:817
#, python-brace-format
msgid "Value must be a list of object IDs, not {type}"
msgstr ""
#: netbox/extras/models/customfields.py:816
#: netbox/extras/models/customfields.py:821
#, python-brace-format
msgid "Found invalid object ID: {id}"
msgstr ""
#: netbox/extras/models/customfields.py:819
#: netbox/extras/models/customfields.py:824
msgid "Required field cannot be empty."
msgstr ""
#: netbox/extras/models/customfields.py:839
#: netbox/extras/models/customfields.py:844
msgid "Base set of predefined choices (optional)"
msgstr ""
#: netbox/extras/models/customfields.py:851
#: netbox/extras/models/customfields.py:856
msgid "Choices are automatically ordered alphabetically"
msgstr ""
#: netbox/extras/models/customfields.py:858
#: netbox/extras/models/customfields.py:863
msgid "custom field choice set"
msgstr ""
#: netbox/extras/models/customfields.py:859
#: netbox/extras/models/customfields.py:864
msgid "custom field choice sets"
msgstr ""
#: netbox/extras/models/customfields.py:901
#: netbox/extras/models/customfields.py:906
msgid "Must define base or extra choices."
msgstr ""
#: netbox/extras/models/customfields.py:910
#: netbox/extras/models/customfields.py:915
#, python-brace-format
msgid "Duplicate value '{value}' found in extra choices."
msgstr ""
#: netbox/extras/models/customfields.py:935
#: netbox/extras/models/customfields.py:940
#, python-brace-format
msgid ""
"Cannot remove choice {choice} as there are {model} objects which reference "