Compare commits

..

22 Commits

Author SHA1 Message Date
Arthur
7b9ab87e38 cleanup 2026-04-10 12:04:48 -07:00
Arthur
90255a268f cleanup 2026-04-10 11:57:49 -07:00
Arthur
8418809344 #21879 - Add post_raw_create signal hook 2026-04-10 11:31:56 -07:00
Arthur
b39e1c73c1 #21879 - Add post_raw_create signal hook 2026-04-10 11:28:10 -07:00
github-actions
4ca688de57 Update source translation strings 2026-04-10 05:40:14 +00:00
bctiemann
ed7ebd9d98 Merge pull request #21863 from netbox-community/21801-duplicate-filename-allowed-when-upload-files-using-s3
Fixes #21801: Ensure unique Image Attachment filenames when using S3 storage
2026-04-09 13:47:54 -04:00
Martin Hauser
48037f6fed fix(extras): Reject unknown custom fields (#21861)
Add validation to reject unknown custom field names during API updates.
Ensure model.clean() normalization is preserved in serializers to remove
stale custom field data from both the database and change logs.
Filter stale keys during serialization to prevent lingering references.

Fixes #21529
2026-04-09 08:49:27 -07:00
Ibtissam El alami
0bc05f27f9 Fixes #21704: Add port mappings to DeviceType & ModuleType YAML export (#21859) 2026-04-09 09:41:14 -05:00
Martin Hauser
a93aae12fa Closes #21862: Stabilize ScriptModule tests and reduce CI noise (#21867) 2026-04-09 09:33:55 -05:00
Martin Hauser
cb7e97c7f7 docs(configuration): Expand S3 storage configuration examples
Update STORAGES configuration examples to include all three storage
backends (default, staticfiles, scripts) with complete option sets.
Add region_name to environment variable example and clarify usage for
S3-compatible services.

Fixes #21864
2026-04-09 09:52:07 -04:00
Martin Hauser
e864dc3ae0 fix(extras): Ensure unique Image Attachment names on S3
Make image attachment filename generation use Django's base collision
handling so overwrite-style storage backends behave like local file
storage.

This preserves the original filename for the first upload, adds a
suffix only on collision, and avoids duplicate image paths in object
change records.

Add regression tests for path generation and collision handling.

Fixes #21801
2026-04-08 22:16:36 +02:00
github-actions
dbb871b75a Update source translation strings 2026-04-08 05:32:13 +00:00
Jeremy Stretch
d75583828b Fixes #21835: Remove misleading help text from ColorField (#21852) 2026-04-07 22:50:41 +02:00
Martin Hauser
7ff7c6d17e feat(ui): Add colored rendering for related object attributes
Introduce `colored` parameter to `RelatedObjectAttr`,
`NestedObjectAttr`, and `ObjectListAttr` to render objects as colored
badges when they expose a `color` attribute.
Update badge template tag to support hex colors and optional URLs.
Apply colored rendering to circuit types, device roles, rack roles,
inventory item roles, and VM roles.

Fixes #21430
2026-04-07 16:40:18 -04:00
Jeremy Stretch
296e708e09 Fixes #21814: Correct display of custom script "last run" time (#21853) 2026-04-07 18:11:12 +02:00
Jeremy Stretch
1bbecef77d Fixes #21841: Fix display of the "edit" button for script modules (#21851) 2026-04-07 08:48:40 -07:00
Jeremy Stretch
1ebeb71ad8 Fixes #21845: Remove whitespace from connection values in interface CSV exports (#21850) 2026-04-07 10:38:22 -05:00
Martin Hauser
d6a1cc5558 test(tables): Add reusable StandardTableTestCase
Introduce `TableTestCases.StandardTableTestCase`, a shared base class
for model-backed table smoke tests. It currently discovers sortable
columns from list-view querysets and verifies that each renders without
exceptions in both ascending and descending order.

Add per-table smoke tests across circuits, core, dcim, extras, ipam,
tenancy, users, virtualization, vpn, and wireless apps.

Fixes #21766
2026-04-06 13:53:13 -04:00
github-actions
09f7df0726 Update source translation strings 2026-04-04 05:26:28 +00:00
Martin Hauser
f242f17ce5 Fixes #21542: Increase supported interface speed values above 2.1 Tbps (#21834) 2026-04-03 16:55:11 -05:00
bctiemann
7d71503ea2 Merge pull request #21837 from netbox-community/21795-update-humanize_speed-to-support-decimal-gbpstbps-output
Closes #21795: Improve humanize_speed formatting for decimal Gbps/Tbps values
2026-04-03 13:06:55 -04:00
Martin Hauser
e07a5966ae feat(dcim): Support decimal Gbps/Tbps output in humanize_speed
Update the humanize_speed template filter to always use the largest
appropriate unit, even when the result is not a whole number.
Previously, values like 2500000 Kbps rendered as "2500 Mbps" instead of
"2.5 Gbps", and 1600000000 Kbps rendered as "1600 Gbps" instead of
"1.6 Tbps".

Fixes #21795
2026-04-03 15:36:42 +02:00
55 changed files with 2008 additions and 735 deletions

View File

@@ -241,21 +241,49 @@ STORAGES = {
Within the `STORAGES` dictionary, `"default"` is used for image uploads, "staticfiles" is for static files and `"scripts"` is used for custom scripts.
If using a remote storage like S3, define the config as `STORAGES[key]["OPTIONS"]` for each storage item as needed. For example:
If using a remote storage such as S3 or an S3-compatible service, define the configuration as `STORAGES[key]["OPTIONS"]` for each storage item as needed. For example:
```python
STORAGES = {
"scripts": {
"BACKEND": "storages.backends.s3boto3.S3Boto3Storage",
"OPTIONS": {
'access_key': 'access key',
STORAGES = {
'default': {
'BACKEND': 'storages.backends.s3.S3Storage',
'OPTIONS': {
'bucket_name': 'netbox',
'access_key': 'access key',
'secret_key': 'secret key',
"allow_overwrite": True,
}
},
'region_name': 'us-east-1',
'endpoint_url': 'https://s3.example.com',
'location': 'media/',
},
},
'staticfiles': {
'BACKEND': 'storages.backends.s3.S3Storage',
'OPTIONS': {
'bucket_name': 'netbox',
'access_key': 'access key',
'secret_key': 'secret key',
'region_name': 'us-east-1',
'endpoint_url': 'https://s3.example.com',
'location': 'static/',
},
},
'scripts': {
'BACKEND': 'storages.backends.s3.S3Storage',
'OPTIONS': {
'bucket_name': 'netbox',
'access_key': 'access key',
'secret_key': 'secret key',
'region_name': 'us-east-1',
'endpoint_url': 'https://s3.example.com',
'location': 'scripts/',
'file_overwrite': True,
},
},
}
```
`bucket_name` is required for `S3Storage`. When using an S3-compatible service, set `region_name` and `endpoint_url` according to your provider.
The specific configuration settings for each storage backend can be found in the [django-storages documentation](https://django-storages.readthedocs.io/en/latest/index.html).
!!! note
@@ -279,6 +307,7 @@ STORAGES = {
'bucket_name': os.environ.get('AWS_STORAGE_BUCKET_NAME'),
'access_key': os.environ.get('AWS_S3_ACCESS_KEY_ID'),
'secret_key': os.environ.get('AWS_S3_SECRET_ACCESS_KEY'),
'region_name': os.environ.get('AWS_S3_REGION_NAME'),
'endpoint_url': os.environ.get('AWS_S3_ENDPOINT_URL'),
'location': 'media/',
}
@@ -289,6 +318,7 @@ STORAGES = {
'bucket_name': os.environ.get('AWS_STORAGE_BUCKET_NAME'),
'access_key': os.environ.get('AWS_S3_ACCESS_KEY_ID'),
'secret_key': os.environ.get('AWS_S3_SECRET_ACCESS_KEY'),
'region_name': os.environ.get('AWS_S3_REGION_NAME'),
'endpoint_url': os.environ.get('AWS_S3_ENDPOINT_URL'),
'location': 'static/',
}

View File

@@ -1,48 +1,46 @@
from django.test import RequestFactory, TestCase, tag
from circuits.models import CircuitGroupAssignment, CircuitTermination
from circuits.tables import CircuitGroupAssignmentTable, CircuitTerminationTable
from circuits.tables import *
from utilities.testing import TableTestCases
@tag('regression')
class CircuitTerminationTableTest(TestCase):
def test_every_orderable_field_does_not_throw_exception(self):
terminations = CircuitTermination.objects.all()
disallowed = {
'actions',
}
orderable_columns = [
column.name
for column in CircuitTerminationTable(terminations).columns
if column.orderable and column.name not in disallowed
]
fake_request = RequestFactory().get('/')
for col in orderable_columns:
for direction in ('-', ''):
table = CircuitTerminationTable(terminations)
table.order_by = f'{direction}{col}'
table.as_html(fake_request)
class CircuitTypeTableTest(TableTestCases.StandardTableTestCase):
table = CircuitTypeTable
@tag('regression')
class CircuitGroupAssignmentTableTest(TestCase):
def test_every_orderable_field_does_not_throw_exception(self):
assignment = CircuitGroupAssignment.objects.all()
disallowed = {
'actions',
}
class CircuitTableTest(TableTestCases.StandardTableTestCase):
table = CircuitTable
orderable_columns = [
column.name
for column in CircuitGroupAssignmentTable(assignment).columns
if column.orderable and column.name not in disallowed
]
fake_request = RequestFactory().get('/')
for col in orderable_columns:
for direction in ('-', ''):
table = CircuitGroupAssignmentTable(assignment)
table.order_by = f'{direction}{col}'
table.as_html(fake_request)
class CircuitTerminationTableTest(TableTestCases.StandardTableTestCase):
table = CircuitTerminationTable
class CircuitGroupTableTest(TableTestCases.StandardTableTestCase):
table = CircuitGroupTable
class CircuitGroupAssignmentTableTest(TableTestCases.StandardTableTestCase):
table = CircuitGroupAssignmentTable
class ProviderTableTest(TableTestCases.StandardTableTestCase):
table = ProviderTable
class ProviderAccountTableTest(TableTestCases.StandardTableTestCase):
table = ProviderAccountTable
class ProviderNetworkTableTest(TableTestCases.StandardTableTestCase):
table = ProviderNetworkTable
class VirtualCircuitTypeTableTest(TableTestCases.StandardTableTestCase):
table = VirtualCircuitTypeTable
class VirtualCircuitTableTest(TableTestCases.StandardTableTestCase):
table = VirtualCircuitTable
class VirtualCircuitTerminationTableTest(TableTestCases.StandardTableTestCase):
table = VirtualCircuitTerminationTable

View File

@@ -196,6 +196,20 @@ class CircuitTestCase(ViewTestCases.PrimaryObjectViewTestCase):
'comments': 'New comments',
}
def test_circuit_type_display_colored(self):
circuit_type = CircuitType.objects.first()
circuit_type.color = '12ab34'
circuit_type.save()
circuit = Circuit.objects.first()
self.add_permissions('circuits.view_circuit')
response = self.client.get(circuit.get_absolute_url())
self.assertHttpStatus(response, 200)
self.assertContains(response, circuit_type.name)
self.assertContains(response, 'background-color: #12ab34')
@override_settings(EXEMPT_VIEW_PERMISSIONS=['*'], EXEMPT_EXCLUDE_MODELS=[])
def test_bulk_import_objects_with_terminations(self):
site = Site.objects.first()

View File

@@ -73,7 +73,7 @@ class CircuitPanel(panels.ObjectAttributesPanel):
provider = attrs.RelatedObjectAttr('provider', linkify=True)
provider_account = attrs.RelatedObjectAttr('provider_account', linkify=True)
cid = attrs.TextAttr('cid', label=_('Circuit ID'), style='font-monospace', copy_button=True)
type = attrs.RelatedObjectAttr('type', linkify=True)
type = attrs.RelatedObjectAttr('type', linkify=True, colored=True)
status = attrs.ChoiceAttr('status')
distance = attrs.NumericAttr('distance', unit_accessor='get_distance_unit_display')
tenant = attrs.RelatedObjectAttr('tenant', linkify=True, grouped_by='group')
@@ -116,7 +116,7 @@ class VirtualCircuitPanel(panels.ObjectAttributesPanel):
provider_network = attrs.RelatedObjectAttr('provider_network', linkify=True)
provider_account = attrs.RelatedObjectAttr('provider_account', linkify=True)
cid = attrs.TextAttr('cid', label=_('Circuit ID'), style='font-monospace', copy_button=True)
type = attrs.RelatedObjectAttr('type', linkify=True)
type = attrs.RelatedObjectAttr('type', linkify=True, colored=True)
status = attrs.ChoiceAttr('status')
tenant = attrs.RelatedObjectAttr('tenant', linkify=True, grouped_by='group')
description = attrs.TextAttr('description')

View File

@@ -0,0 +1,26 @@
from core.models import ObjectChange
from core.tables import *
from utilities.testing import TableTestCases
class DataSourceTableTest(TableTestCases.StandardTableTestCase):
table = DataSourceTable
class DataFileTableTest(TableTestCases.StandardTableTestCase):
table = DataFileTable
class JobTableTest(TableTestCases.StandardTableTestCase):
table = JobTable
class ObjectChangeTableTest(TableTestCases.StandardTableTestCase):
table = ObjectChangeTable
queryset_sources = [
('ObjectChangeListView', ObjectChange.objects.valid_models()),
]
class ConfigRevisionTableTest(TableTestCases.StandardTableTestCase):
table = ConfigRevisionTable

View File

@@ -26,6 +26,7 @@ from tenancy.models import *
from users.filterset_mixins import OwnerFilterMixin
from users.models import User
from utilities.filters import (
MultiValueBigNumberFilter,
MultiValueCharFilter,
MultiValueContentTypeFilter,
MultiValueMACAddressFilter,
@@ -2175,7 +2176,7 @@ class InterfaceFilterSet(
distinct=False,
label=_('LAG interface (ID)'),
)
speed = MultiValueNumberFilter()
speed = MultiValueBigNumberFilter(min_value=0)
duplex = django_filters.MultipleChoiceFilter(
choices=InterfaceDuplexChoices,
distinct=False,

View File

@@ -20,7 +20,13 @@ from netbox.forms.mixins import ChangelogMessageMixin, OwnerMixin
from tenancy.models import Tenant
from users.models import User
from utilities.forms import BulkEditForm, add_blank_choice, form_from_model
from utilities.forms.fields import ColorField, DynamicModelChoiceField, DynamicModelMultipleChoiceField, JSONField
from utilities.forms.fields import (
ColorField,
DynamicModelChoiceField,
DynamicModelMultipleChoiceField,
JSONField,
PositiveBigIntegerField,
)
from utilities.forms.rendering import FieldSet, InlineFields, TabbedGroups
from utilities.forms.widgets import BulkEditNullBooleanSelect, NumberWithOptions
from virtualization.models import Cluster
@@ -1420,7 +1426,7 @@ class InterfaceBulkEditForm(
'device_id': '$device',
}
)
speed = forms.IntegerField(
speed = PositiveBigIntegerField(
label=_('Speed'),
required=False,
widget=NumberWithOptions(

View File

@@ -19,7 +19,7 @@ from tenancy.forms import ContactModelFilterForm, TenancyFilterForm
from tenancy.models import Tenant
from users.models import User
from utilities.forms import BOOLEAN_WITH_BLANK_CHOICES, FilterForm, add_blank_choice
from utilities.forms.fields import ColorField, DynamicModelMultipleChoiceField, TagFilterField
from utilities.forms.fields import ColorField, DynamicModelMultipleChoiceField, PositiveBigIntegerField, TagFilterField
from utilities.forms.rendering import FieldSet
from utilities.forms.widgets import NumberWithOptions
from virtualization.models import Cluster, ClusterGroup, VirtualMachine
@@ -1603,7 +1603,7 @@ class InterfaceFilterForm(PathEndpointFilterForm, DeviceComponentFilterForm):
choices=InterfaceTypeChoices,
required=False
)
speed = forms.IntegerField(
speed = PositiveBigIntegerField(
label=_('Speed'),
required=False,
widget=NumberWithOptions(

View File

@@ -47,7 +47,13 @@ if TYPE_CHECKING:
VRFFilter,
)
from netbox.graphql.enums import ColorEnum
from netbox.graphql.filter_lookups import FloatLookup, IntegerArrayLookup, IntegerLookup, TreeNodeFilter
from netbox.graphql.filter_lookups import (
BigIntegerLookup,
FloatLookup,
IntegerArrayLookup,
IntegerLookup,
TreeNodeFilter,
)
from users.graphql.filters import UserFilter
from virtualization.graphql.filters import ClusterFilter
from vpn.graphql.filters import L2VPNFilter, TunnelTerminationFilter
@@ -519,7 +525,7 @@ class InterfaceFilter(
strawberry_django.filter_field()
)
mgmt_only: FilterLookup[bool] | None = strawberry_django.filter_field()
speed: Annotated['IntegerLookup', strawberry.lazy('netbox.graphql.filter_lookups')] | None = (
speed: Annotated['BigIntegerLookup', strawberry.lazy('netbox.graphql.filter_lookups')] | None = (
strawberry_django.filter_field()
)
duplex: BaseFilterLookup[Annotated['InterfaceDuplexEnum', strawberry.lazy('dcim.graphql.enums')]] | None = (

View File

@@ -433,6 +433,7 @@ class MACAddressType(PrimaryObjectType):
)
class InterfaceType(IPAddressesMixin, ModularComponentType, CabledObjectMixin, PathEndpointMixin):
_name: str
speed: BigInt | None
wwn: str | None
parent: Annotated["InterfaceType", strawberry.lazy('dcim.graphql.types')] | None
bridge: Annotated["InterfaceType", strawberry.lazy('dcim.graphql.types')] | None

View File

@@ -0,0 +1,15 @@
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('dcim', '0226_modulebay_rebuild_tree'),
]
operations = [
migrations.AlterField(
model_name='interface',
name='speed',
field=models.PositiveBigIntegerField(blank=True, null=True),
),
]

View File

@@ -549,6 +549,14 @@ class PortTemplateMapping(PortMappingBase):
self.module_type = self.front_port.module_type
super().save(*args, **kwargs)
def to_yaml(self):
return {
'front_port': self.front_port.name,
'front_port_position': self.front_port_position,
'rear_port': self.rear_port.name,
'rear_port_position': self.rear_port_position,
}
class FrontPortTemplate(ModularComponentTemplateModel):
"""

View File

@@ -806,7 +806,7 @@ class Interface(
verbose_name=_('management only'),
help_text=_('This interface is used only for out-of-band management')
)
speed = models.PositiveIntegerField(
speed = models.PositiveBigIntegerField(
blank=True,
null=True,
verbose_name=_('speed (Kbps)')

View File

@@ -275,6 +275,15 @@ class DeviceType(ImageAttachmentsMixin, PrimaryModel, WeightMixin):
data['rear-ports'] = [
c.to_yaml() for c in self.rearporttemplates.all()
]
# Port mappings
port_mapping_data = [
c.to_yaml() for c in self.port_mappings.all()
]
if port_mapping_data:
data['port-mappings'] = port_mapping_data
if self.modulebaytemplates.exists():
data['module-bays'] = [
c.to_yaml() for c in self.modulebaytemplates.all()

View File

@@ -192,6 +192,14 @@ class ModuleType(ImageAttachmentsMixin, PrimaryModel, WeightMixin):
c.to_yaml() for c in self.rearporttemplates.all()
]
# Port mappings
port_mapping_data = [
c.to_yaml() for c in self.port_mappings.all()
]
if port_mapping_data:
data['port-mappings'] = port_mapping_data
return yaml.dump(dict(data), sort_keys=False)

View File

@@ -6,6 +6,7 @@ from django.dispatch import receiver
from dcim.choices import CableEndChoices, LinkStatusChoices
from ipam.models import Prefix
from netbox.signals import post_raw_create
from virtualization.models import Cluster, VMInterface
from wireless.models import WirelessLAN
@@ -166,6 +167,27 @@ def retrace_cable_paths(instance, **kwargs):
cablepath.retrace()
@receiver(post_raw_create, sender=Cable)
def retrace_cable_paths_after_raw_create(sender, pks, **kwargs):
"""
When Cables are created via a raw save, the normal Cable.save() path is bypassed,
so trace_paths is never sent. Retrace paths for all newly created cables.
Callers must only send this signal after all CableTerminations for the given cables
have been applied. If a cable has no terminations, update_connected_endpoints will
find empty termination lists and skip path creation — so this is safe to call even
if terminations are absent, but path tracing will have no effect.
Note: raw=False (the default) is intentional here — we explicitly want
update_connected_endpoints to run, unlike during fixture loading (raw=True).
"""
logger = logging.getLogger('netbox.dcim.cable')
for cable in Cable.objects.filter(pk__in=pks):
cable._terminations_modified = True
trace_paths.send(Cable, instance=cable, created=True)
logger.debug(f"Retraced cable paths for Cable {cable.pk}")
@receiver((post_delete, post_save), sender=PortMapping)
def update_passthrough_port_paths(instance, **kwargs):
"""

View File

@@ -382,6 +382,17 @@ class PathEndpointTable(CableTerminationTable):
orderable=False
)
def value_connection(self, value):
if value:
connections = []
for termination in value:
if hasattr(termination, 'parent_object'):
connections.append(f'{termination.parent_object} > {termination}')
else:
connections.append(str(termination))
return ', '.join(connections)
return None
class ConsolePortTable(ModularDeviceComponentTable, PathEndpointTable):
device = tables.Column(
@@ -683,6 +694,15 @@ class InterfaceTable(BaseInterfaceTable, ModularDeviceComponentTable, PathEndpoi
orderable=False
)
def value_connection(self, record, value):
if record.is_virtual and hasattr(record, 'virtual_circuit_termination') and record.virtual_circuit_termination:
connections = [
f"{t.interface.parent_object} > {t.interface} via {t.parent_object}"
for t in record.connected_endpoints
]
return ', '.join(connections)
return super().value_connection(value)
class Meta(DeviceComponentTable.Meta):
model = models.Interface
fields = (

View File

@@ -1930,9 +1930,9 @@ class InterfaceTest(Mixins.ComponentTraceMixin, APIViewTestCases.APIViewTestCase
{
'device': device.pk,
'name': 'Interface 4',
'type': '1000base-t',
'type': 'other',
'mode': InterfaceModeChoices.MODE_TAGGED,
'speed': 1000000,
'speed': 16_000_000_000,
'duplex': 'full',
'vrf': vrfs[0].pk,
'poe_mode': InterfacePoEModeChoices.MODE_PD,

View File

@@ -4655,7 +4655,7 @@ class InterfaceTestCase(TestCase, DeviceComponentFilterSetTests, ChangeLoggedFil
enabled=True,
mgmt_only=True,
tx_power=40,
speed=100000,
speed=16_000_000_000,
duplex='full',
poe_mode=InterfacePoEModeChoices.MODE_PD,
poe_type=InterfacePoETypeChoices.TYPE_2_8023AT,
@@ -4757,7 +4757,7 @@ class InterfaceTestCase(TestCase, DeviceComponentFilterSetTests, ChangeLoggedFil
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_speed(self):
params = {'speed': [1000000, 100000]}
params = {'speed': [16_000_000_000, 1_000_000, 100_000]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
def test_duplex(self):

View File

@@ -0,0 +1,204 @@
from dcim.models import ConsolePort, Interface, PowerPort
from dcim.tables import *
from utilities.testing import TableTestCases
#
# Sites
#
class RegionTableTest(TableTestCases.StandardTableTestCase):
table = RegionTable
class SiteGroupTableTest(TableTestCases.StandardTableTestCase):
table = SiteGroupTable
class SiteTableTest(TableTestCases.StandardTableTestCase):
table = SiteTable
class LocationTableTest(TableTestCases.StandardTableTestCase):
table = LocationTable
#
# Racks
#
class RackRoleTableTest(TableTestCases.StandardTableTestCase):
table = RackRoleTable
class RackTypeTableTest(TableTestCases.StandardTableTestCase):
table = RackTypeTable
class RackTableTest(TableTestCases.StandardTableTestCase):
table = RackTable
class RackReservationTableTest(TableTestCases.StandardTableTestCase):
table = RackReservationTable
#
# Device types
#
class ManufacturerTableTest(TableTestCases.StandardTableTestCase):
table = ManufacturerTable
class DeviceTypeTableTest(TableTestCases.StandardTableTestCase):
table = DeviceTypeTable
#
# Module types
#
class ModuleTypeProfileTableTest(TableTestCases.StandardTableTestCase):
table = ModuleTypeProfileTable
class ModuleTypeTableTest(TableTestCases.StandardTableTestCase):
table = ModuleTypeTable
class ModuleTableTest(TableTestCases.StandardTableTestCase):
table = ModuleTable
#
# Devices
#
class DeviceRoleTableTest(TableTestCases.StandardTableTestCase):
table = DeviceRoleTable
class PlatformTableTest(TableTestCases.StandardTableTestCase):
table = PlatformTable
class DeviceTableTest(TableTestCases.StandardTableTestCase):
table = DeviceTable
#
# Device components
#
class ConsolePortTableTest(TableTestCases.StandardTableTestCase):
table = ConsolePortTable
class ConsoleServerPortTableTest(TableTestCases.StandardTableTestCase):
table = ConsoleServerPortTable
class PowerPortTableTest(TableTestCases.StandardTableTestCase):
table = PowerPortTable
class PowerOutletTableTest(TableTestCases.StandardTableTestCase):
table = PowerOutletTable
class InterfaceTableTest(TableTestCases.StandardTableTestCase):
table = InterfaceTable
class FrontPortTableTest(TableTestCases.StandardTableTestCase):
table = FrontPortTable
class RearPortTableTest(TableTestCases.StandardTableTestCase):
table = RearPortTable
class ModuleBayTableTest(TableTestCases.StandardTableTestCase):
table = ModuleBayTable
class DeviceBayTableTest(TableTestCases.StandardTableTestCase):
table = DeviceBayTable
class InventoryItemTableTest(TableTestCases.StandardTableTestCase):
table = InventoryItemTable
class InventoryItemRoleTableTest(TableTestCases.StandardTableTestCase):
table = InventoryItemRoleTable
#
# Connections
#
class ConsoleConnectionTableTest(TableTestCases.StandardTableTestCase):
table = ConsoleConnectionTable
queryset_sources = [
('ConsoleConnectionsListView', ConsolePort.objects.filter(_path__is_complete=True)),
]
class PowerConnectionTableTest(TableTestCases.StandardTableTestCase):
table = PowerConnectionTable
queryset_sources = [
('PowerConnectionsListView', PowerPort.objects.filter(_path__is_complete=True)),
]
class InterfaceConnectionTableTest(TableTestCases.StandardTableTestCase):
table = InterfaceConnectionTable
queryset_sources = [
('InterfaceConnectionsListView', Interface.objects.filter(_path__is_complete=True)),
]
#
# Cables
#
class CableTableTest(TableTestCases.StandardTableTestCase):
table = CableTable
#
# Power
#
class PowerPanelTableTest(TableTestCases.StandardTableTestCase):
table = PowerPanelTable
class PowerFeedTableTest(TableTestCases.StandardTableTestCase):
table = PowerFeedTable
#
# Virtual chassis
#
class VirtualChassisTableTest(TableTestCases.StandardTableTestCase):
table = VirtualChassisTable
#
# Virtual device contexts
#
class VirtualDeviceContextTableTest(TableTestCases.StandardTableTestCase):
table = VirtualDeviceContextTable
#
# MAC addresses
#
class MACAddressTableTest(TableTestCases.StandardTableTestCase):
table = MACAddressTable

View File

@@ -2362,6 +2362,23 @@ class DeviceTestCase(ViewTestCases.PrimaryObjectViewTestCase):
self.remove_permissions('dcim.view_device')
self.assertHttpStatus(self.client.get(url), 403)
def test_device_role_display_colored(self):
parent_role = DeviceRole.objects.create(name='Parent Role', slug='parent-role', color='111111')
child_role = DeviceRole.objects.create(name='Child Role', slug='child-role', parent=parent_role, color='aa00bb')
device = Device.objects.first()
device.role = child_role
device.save()
self.add_permissions('dcim.view_device')
response = self.client.get(device.get_absolute_url())
self.assertHttpStatus(response, 200)
self.assertContains(response, 'Parent Role')
self.assertContains(response, 'Child Role')
self.assertContains(response, 'background-color: #aa00bb')
self.assertNotContains(response, 'background-color: #111111')
@override_settings(EXEMPT_VIEW_PERMISSIONS=['*'])
def test_bulk_import_duplicate_ids_error_message(self):
device = Device.objects.first()
@@ -2961,13 +2978,13 @@ class InterfaceTestCase(ViewTestCases.DeviceComponentViewTestCase):
cls.form_data = {
'device': device.pk,
'name': 'Interface X',
'type': InterfaceTypeChoices.TYPE_1GE_GBIC,
'type': InterfaceTypeChoices.TYPE_OTHER,
'enabled': False,
'bridge': interfaces[4].pk,
'lag': interfaces[3].pk,
'wwn': EUI('01:02:03:04:05:06:07:08', version=64),
'mtu': 65000,
'speed': 1000000,
'speed': 16_000_000_000,
'duplex': 'full',
'mgmt_only': True,
'description': 'A front port',
@@ -2985,13 +3002,13 @@ class InterfaceTestCase(ViewTestCases.DeviceComponentViewTestCase):
cls.bulk_create_data = {
'device': device.pk,
'name': 'Interface [4-6]',
'type': InterfaceTypeChoices.TYPE_1GE_GBIC,
'type': InterfaceTypeChoices.TYPE_OTHER,
'enabled': False,
'bridge': interfaces[4].pk,
'lag': interfaces[3].pk,
'wwn': EUI('01:02:03:04:05:06:07:08', version=64),
'mtu': 2000,
'speed': 100000,
'speed': 16_000_000_000,
'duplex': 'half',
'mgmt_only': True,
'description': 'A front port',

View File

@@ -50,7 +50,7 @@ class RackPanel(panels.ObjectAttributesPanel):
tenant = attrs.RelatedObjectAttr('tenant', linkify=True, grouped_by='group')
status = attrs.ChoiceAttr('status')
rack_type = attrs.RelatedObjectAttr('rack_type', linkify=True, grouped_by='manufacturer')
role = attrs.RelatedObjectAttr('role', linkify=True)
role = attrs.RelatedObjectAttr('role', linkify=True, colored=True)
description = attrs.TextAttr('description')
serial = attrs.TextAttr('serial', label=_('Serial number'), style='font-monospace', copy_button=True)
asset_tag = attrs.TextAttr('asset_tag', style='font-monospace', copy_button=True)
@@ -103,7 +103,7 @@ class DeviceManagementPanel(panels.ObjectAttributesPanel):
title = _('Management')
status = attrs.ChoiceAttr('status')
role = attrs.NestedObjectAttr('role', linkify=True, max_depth=3)
role = attrs.NestedObjectAttr('role', linkify=True, max_depth=3, colored=True)
platform = attrs.NestedObjectAttr('platform', linkify=True, max_depth=3)
primary_ip4 = attrs.TemplatedAttr(
'primary_ip4',
@@ -279,7 +279,7 @@ class InventoryItemPanel(panels.ObjectAttributesPanel):
name = attrs.TextAttr('name')
label = attrs.TextAttr('label')
status = attrs.ChoiceAttr('status')
role = attrs.RelatedObjectAttr('role', linkify=True)
role = attrs.RelatedObjectAttr('role', linkify=True, colored=True)
component = attrs.GenericForeignKeyAttr('component', linkify=True)
manufacturer = attrs.RelatedObjectAttr('manufacturer', linkify=True)
part_id = attrs.TextAttr('part_id', label=_('Part ID'))

View File

@@ -85,8 +85,18 @@ class CustomFieldsDataField(Field):
"values."
)
custom_fields = {cf.name: cf for cf in self._get_custom_fields()}
# Reject any unknown custom field names
invalid_fields = set(data) - set(custom_fields)
if invalid_fields:
raise ValidationError({
field: _("Custom field '{name}' does not exist for this object type.").format(name=field)
for field in sorted(invalid_fields)
})
# Serialize object and multi-object values
for cf in self._get_custom_fields():
for cf in custom_fields.values():
if cf.name in data and data[cf.name] not in CUSTOMFIELD_EMPTY_VALUES and cf.type in (
CustomFieldTypeChoices.TYPE_OBJECT,
CustomFieldTypeChoices.TYPE_MULTIOBJECT

View File

@@ -1,5 +1,6 @@
import datetime
import hashlib
import io
from unittest.mock import MagicMock, patch
from django.contrib.contenttypes.models import ContentType
@@ -1013,10 +1014,14 @@ class ScriptTest(APITestCase):
@classmethod
def setUpTestData(cls):
module = ScriptModule.objects.create(
file_root=ManagedFileRootPathChoices.SCRIPTS,
file_path='script.py',
)
# Avoid trying to import a non-existent on-disk module during setup.
# This test creates the Script row explicitly and monkey-patches
# Script.python_class below.
with patch.object(ScriptModule, 'sync_classes'):
module = ScriptModule.objects.create(
file_root=ManagedFileRootPathChoices.SCRIPTS,
file_path='script.py',
)
script = Script.objects.create(
module=module,
name='Test script',
@@ -1419,9 +1424,20 @@ class ScriptModuleTest(APITestCase):
upload_file = SimpleUploadedFile('test_upload.py', script_content, content_type='text/plain')
mock_storage = MagicMock()
mock_storage.save.return_value = 'test_upload.py'
with patch('extras.api.serializers_.scripts.storages') as mock_storages:
mock_storages.create_storage.return_value = mock_storage
mock_storages.backends = {'scripts': {}}
# The upload serializer writes the file via storages.create_storage(...).save(),
# but ScriptModule.sync_classes() later imports it via storages["scripts"].open().
# Provide both behaviors so the uploaded module can actually be loaded during the test.
mock_storage.open.side_effect = lambda *args, **kwargs: io.BytesIO(script_content)
with (
patch('extras.api.serializers_.scripts.storages') as mock_serializer_storages,
patch('extras.models.mixins.storages') as mock_module_storages,
):
mock_serializer_storages.create_storage.return_value = mock_storage
mock_serializer_storages.backends = {'scripts': {}}
mock_module_storages.__getitem__.return_value = mock_storage
response = self.client.post(
self.url,
{'file': upload_file},
@@ -1432,6 +1448,7 @@ class ScriptModuleTest(APITestCase):
self.assertEqual(response.data['file_path'], 'test_upload.py')
mock_storage.save.assert_called_once()
self.assertTrue(ScriptModule.objects.filter(file_path='test_upload.py').exists())
self.assertTrue(Script.objects.filter(module__file_path='test_upload.py', name='TestScript').exists())
def test_upload_script_module_without_file_fails(self):
self.add_permissions('extras.add_scriptmodule', 'core.add_managedfile')

View File

@@ -7,7 +7,7 @@ from django.test import tag
from django.urls import reverse
from rest_framework import status
from core.models import ObjectType
from core.models import ObjectChange, ObjectType
from dcim.filtersets import SiteFilterSet
from dcim.forms import SiteImportForm
from dcim.models import Manufacturer, Rack, Site
@@ -1194,6 +1194,82 @@ class CustomFieldAPITest(APITestCase):
list(original_cfvs['multiobject_field'])
)
@tag('regression')
def test_update_single_object_rejects_unknown_custom_fields(self):
site2 = Site.objects.get(name='Site 2')
original_cf_data = {**site2.custom_field_data}
url = reverse('dcim-api:site-detail', kwargs={'pk': site2.pk})
self.add_permissions('dcim.change_site')
data = {
'custom_fields': {
'text_field': 'valid',
'thisfieldshouldntexist': 'random text here',
},
}
response = self.client.patch(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_400_BAD_REQUEST)
self.assertIn('custom_fields', response.data)
self.assertIn('thisfieldshouldntexist', response.data['custom_fields'])
# Ensure the object was not modified
site2.refresh_from_db()
self.assertEqual(site2.custom_field_data, original_cf_data)
@tag('regression')
def test_update_single_object_prunes_stale_custom_field_data_from_database_and_postchange_data(self):
stale_key = 'thisfieldshouldntexist'
stale_value = 'random text here'
updated_text_value = 'ABCD'
site2 = Site.objects.get(name='Site 2')
original_text_value = site2.custom_field_data['text_field']
object_type = ObjectType.objects.get_for_model(Site)
# Seed stale custom field data directly in the database to mimic a polluted row.
Site.objects.filter(pk=site2.pk).update(
custom_field_data={
**site2.custom_field_data,
stale_key: stale_value,
}
)
site2.refresh_from_db()
self.assertIn(stale_key, site2.custom_field_data)
existing_change_ids = set(
ObjectChange.objects.filter(
changed_object_type=object_type,
changed_object_id=site2.pk,
).values_list('pk', flat=True)
)
url = reverse('dcim-api:site-detail', kwargs={'pk': site2.pk})
self.add_permissions('dcim.change_site')
data = {
'custom_fields': {
'text_field': updated_text_value,
},
}
response = self.client.patch(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
site2.refresh_from_db()
self.assertEqual(site2.cf['text_field'], updated_text_value)
self.assertNotIn(stale_key, site2.custom_field_data)
object_changes = ObjectChange.objects.filter(
changed_object_type=object_type,
changed_object_id=site2.pk,
).exclude(pk__in=existing_change_ids)
self.assertEqual(object_changes.count(), 1)
object_change = object_changes.get()
self.assertEqual(object_change.prechange_data['custom_fields']['text_field'], original_text_value)
self.assertEqual(object_change.postchange_data['custom_fields']['text_field'], updated_text_value)
self.assertNotIn(stale_key, object_change.postchange_data['custom_fields'])
def test_specify_related_object_by_attr(self):
site1 = Site.objects.get(name='Site 1')
vlans = VLAN.objects.all()[:3]

View File

@@ -1,10 +1,15 @@
import io
import tempfile
from pathlib import Path
from unittest.mock import patch
from django.contrib.contenttypes.models import ContentType
from django.core.files.base import ContentFile
from django.core.files.storage import Storage
from django.core.files.uploadedfile import SimpleUploadedFile
from django.forms import ValidationError
from django.test import TestCase, tag
from PIL import Image
from core.models import AutoSyncRecord, DataSource, ObjectType
from dcim.models import Device, DeviceRole, DeviceType, Location, Manufacturer, Platform, Region, Site, SiteGroup
@@ -14,10 +19,50 @@ from utilities.exceptions import AbortRequest
from virtualization.models import Cluster, ClusterGroup, ClusterType, VirtualMachine
class OverwriteStyleMemoryStorage(Storage):
"""
In-memory storage that mimics overwrite-style backends by returning the
incoming name unchanged from get_available_name().
"""
def __init__(self):
self.files = {}
def _open(self, name, mode='rb'):
return ContentFile(self.files[name], name=name)
def _save(self, name, content):
self.files[name] = content.read()
return name
def delete(self, name):
self.files.pop(name, None)
def exists(self, name):
return name in self.files
def get_available_name(self, name, max_length=None):
return name
def get_alternative_name(self, file_root, file_ext):
return f'{file_root}_sdmmer4{file_ext}'
def listdir(self, path):
return [], list(self.files)
def size(self, name):
return len(self.files[name])
def url(self, name):
return f'https://example.invalid/{name}'
class ImageAttachmentTests(TestCase):
@classmethod
def setUpTestData(cls):
cls.ct_rack = ContentType.objects.get_by_natural_key('dcim', 'rack')
cls.ct_site = ContentType.objects.get_by_natural_key('dcim', 'site')
cls.site = Site.objects.create(name='Site 1')
cls.image_content = b''
def _stub_image_attachment(self, object_id, image_filename, name=None):
@@ -41,6 +86,15 @@ class ImageAttachmentTests(TestCase):
)
return ia
def _uploaded_png(self, filename):
image = io.BytesIO()
Image.new('RGB', (1, 1)).save(image, format='PNG')
return SimpleUploadedFile(
name=filename,
content=image.getvalue(),
content_type='image/png',
)
def test_filename_strips_expected_prefix(self):
"""
Tests that the filename of the image attachment is stripped of the expected
@@ -89,6 +143,37 @@ class ImageAttachmentTests(TestCase):
ia = self._stub_image_attachment(12, 'image-attachments/rack_12_file.png', name='')
self.assertEqual('file.png', str(ia))
def test_duplicate_uploaded_names_get_suffixed_with_overwrite_style_storage(self):
storage = OverwriteStyleMemoryStorage()
field = ImageAttachment._meta.get_field('image')
with patch.object(field, 'storage', storage):
first = ImageAttachment(
object_type=self.ct_site,
object_id=self.site.pk,
image=self._uploaded_png('action-buttons.png'),
)
first.save()
second = ImageAttachment(
object_type=self.ct_site,
object_id=self.site.pk,
image=self._uploaded_png('action-buttons.png'),
)
second.save()
base_name = f'image-attachments/site_{self.site.pk}_action-buttons.png'
suffixed_name = f'image-attachments/site_{self.site.pk}_action-buttons_sdmmer4.png'
self.assertEqual(first.image.name, base_name)
self.assertEqual(second.image.name, suffixed_name)
self.assertNotEqual(first.image.name, second.image.name)
self.assertEqual(first.filename, 'action-buttons.png')
self.assertEqual(second.filename, 'action-buttons_sdmmer4.png')
self.assertCountEqual(storage.files.keys(), {base_name, suffixed_name})
class TagTest(TestCase):

View File

@@ -1,24 +1,93 @@
from django.test import RequestFactory, TestCase, tag
from extras.models import EventRule
from extras.tables import EventRuleTable
from extras.models import Bookmark, Notification, Subscription
from extras.tables import *
from utilities.testing import TableTestCases
@tag('regression')
class EventRuleTableTest(TestCase):
def test_every_orderable_field_does_not_throw_exception(self):
rule = EventRule.objects.all()
disallowed = {
'actions',
}
class CustomFieldTableTest(TableTestCases.StandardTableTestCase):
table = CustomFieldTable
orderable_columns = [
column.name for column in EventRuleTable(rule).columns if column.orderable and column.name not in disallowed
]
fake_request = RequestFactory().get('/')
for col in orderable_columns:
for direction in ('-', ''):
table = EventRuleTable(rule)
table.order_by = f'{direction}{col}'
table.as_html(fake_request)
class CustomFieldChoiceSetTableTest(TableTestCases.StandardTableTestCase):
table = CustomFieldChoiceSetTable
class CustomLinkTableTest(TableTestCases.StandardTableTestCase):
table = CustomLinkTable
class ExportTemplateTableTest(TableTestCases.StandardTableTestCase):
table = ExportTemplateTable
class SavedFilterTableTest(TableTestCases.StandardTableTestCase):
table = SavedFilterTable
class TableConfigTableTest(TableTestCases.StandardTableTestCase):
table = TableConfigTable
class BookmarkTableTest(TableTestCases.StandardTableTestCase):
table = BookmarkTable
# The list view for this table lives in account.views (not extras.views),
# so auto-discovery cannot find it. Provide an explicit queryset source.
queryset_sources = [
('Bookmark.objects.all()', Bookmark.objects.all()),
]
class NotificationGroupTableTest(TableTestCases.StandardTableTestCase):
table = NotificationGroupTable
class NotificationTableTest(TableTestCases.StandardTableTestCase):
table = NotificationTable
# The list view for this table lives in account.views (not extras.views),
# so auto-discovery cannot find it. Provide an explicit queryset source.
queryset_sources = [
('Notification.objects.all()', Notification.objects.all()),
]
class SubscriptionTableTest(TableTestCases.StandardTableTestCase):
table = SubscriptionTable
# The list view for this table lives in account.views (not extras.views),
# so auto-discovery cannot find it. Provide an explicit queryset source.
queryset_sources = [
('Subscription.objects.all()', Subscription.objects.all()),
]
class WebhookTableTest(TableTestCases.StandardTableTestCase):
table = WebhookTable
class EventRuleTableTest(TableTestCases.StandardTableTestCase):
table = EventRuleTable
class TagTableTest(TableTestCases.StandardTableTestCase):
table = TagTable
class ConfigContextProfileTableTest(TableTestCases.StandardTableTestCase):
table = ConfigContextProfileTable
class ConfigContextTableTest(TableTestCases.StandardTableTestCase):
table = ConfigContextTable
class ConfigTemplateTableTest(TableTestCases.StandardTableTestCase):
table = ConfigTemplateTable
class ImageAttachmentTableTest(TableTestCases.StandardTableTestCase):
table = ImageAttachmentTable
class JournalEntryTableTest(TableTestCases.StandardTableTestCase):
table = JournalEntryTable

View File

@@ -1,10 +1,12 @@
from types import SimpleNamespace
from unittest.mock import patch
from django.contrib.contenttypes.models import ContentType
from django.core.files.storage import Storage
from django.test import TestCase
from extras.models import ExportTemplate
from extras.utils import filename_from_model, image_upload
from extras.models import ExportTemplate, ImageAttachment
from extras.utils import _build_image_attachment_path, filename_from_model, image_upload
from tenancy.models import ContactGroup, TenantGroup
from wireless.models import WirelessLANGroup
@@ -22,6 +24,25 @@ class FilenameFromModelTests(TestCase):
self.assertEqual(filename_from_model(model), expected)
class OverwriteStyleStorage(Storage):
"""
Mimic an overwrite-style backend (for example, S3 with file_overwrite=True),
where get_available_name() returns the incoming name unchanged.
"""
def __init__(self, existing_names=None):
self.existing_names = set(existing_names or [])
def exists(self, name):
return name in self.existing_names
def get_available_name(self, name, max_length=None):
return name
def get_alternative_name(self, file_root, file_ext):
return f'{file_root}_sdmmer4{file_ext}'
class ImageUploadTests(TestCase):
@classmethod
def setUpTestData(cls):
@@ -31,16 +52,18 @@ class ImageUploadTests(TestCase):
def _stub_instance(self, object_id=12, name=None):
"""
Creates a minimal stub for use with the `image_upload()` function.
This method generates an instance of `SimpleNamespace` containing a set
of attributes required to simulate the expected input for the
`image_upload()` method.
It is designed to simplify testing or processing by providing a
lightweight representation of an object.
Creates a minimal stub for use with image attachment path generation.
"""
return SimpleNamespace(object_type=self.ct_rack, object_id=object_id, name=name)
def _bound_instance(self, *, storage, object_id=12, name=None, max_length=100):
return SimpleNamespace(
object_type=self.ct_rack,
object_id=object_id,
name=name,
image=SimpleNamespace(field=SimpleNamespace(storage=storage, max_length=max_length)),
)
def _second_segment(self, path: str):
"""
Extracts and returns the portion of the input string after the
@@ -53,7 +76,7 @@ class ImageUploadTests(TestCase):
Tests handling of a Windows file path with a fake directory and extension.
"""
inst = self._stub_instance(name=None)
path = image_upload(inst, r'C:\fake_path\MyPhoto.JPG')
path = _build_image_attachment_path(inst, r'C:\fake_path\MyPhoto.JPG')
# Base directory and single-level path
seg2 = self._second_segment(path)
self.assertTrue(path.startswith('image-attachments/rack_12_'))
@@ -67,7 +90,7 @@ class ImageUploadTests(TestCase):
create subdirectories.
"""
inst = self._stub_instance(name='5/31/23')
path = image_upload(inst, 'image.png')
path = _build_image_attachment_path(inst, 'image.png')
seg2 = self._second_segment(path)
self.assertTrue(seg2.startswith('rack_12_'))
self.assertNotIn('/', seg2)
@@ -80,7 +103,7 @@ class ImageUploadTests(TestCase):
into a single directory name without creating subdirectories.
"""
inst = self._stub_instance(name=r'5\31\23')
path = image_upload(inst, 'image_name.png')
path = _build_image_attachment_path(inst, 'image_name.png')
seg2 = self._second_segment(path)
self.assertTrue(seg2.startswith('rack_12_'))
@@ -93,7 +116,7 @@ class ImageUploadTests(TestCase):
Tests the output path format generated by the `image_upload` function.
"""
inst = self._stub_instance(object_id=99, name='label')
path = image_upload(inst, 'a.webp')
path = _build_image_attachment_path(inst, 'a.webp')
# The second segment must begin with "rack_99_"
seg2 = self._second_segment(path)
self.assertTrue(seg2.startswith('rack_99_'))
@@ -105,7 +128,7 @@ class ImageUploadTests(TestCase):
is omitted.
"""
inst = self._stub_instance(name='test')
path = image_upload(inst, 'document.txt')
path = _build_image_attachment_path(inst, 'document.txt')
seg2 = self._second_segment(path)
self.assertTrue(seg2.startswith('rack_12_test'))
@@ -121,7 +144,7 @@ class ImageUploadTests(TestCase):
# Suppose the instance name has surrounding whitespace and
# extra slashes.
inst = self._stub_instance(name=' my/complex\\name ')
path = image_upload(inst, 'irrelevant.png')
path = _build_image_attachment_path(inst, 'irrelevant.png')
# The output should be flattened and sanitized.
# We expect the name to be transformed into a valid filename without
@@ -141,7 +164,7 @@ class ImageUploadTests(TestCase):
for name in ['2025/09/12', r'2025\09\12']:
with self.subTest(name=name):
inst = self._stub_instance(name=name)
path = image_upload(inst, 'x.jpeg')
path = _build_image_attachment_path(inst, 'x.jpeg')
seg2 = self._second_segment(path)
self.assertTrue(seg2.startswith('rack_12_'))
self.assertNotIn('/', seg2)
@@ -154,7 +177,49 @@ class ImageUploadTests(TestCase):
SuspiciousFileOperation, the fallback default is used.
"""
inst = self._stub_instance(name=' ')
path = image_upload(inst, 'sample.png')
path = _build_image_attachment_path(inst, 'sample.png')
# Expect the fallback name 'unnamed' to be used.
self.assertIn('unnamed', path)
self.assertTrue(path.startswith('image-attachments/rack_12_'))
def test_image_upload_preserves_original_name_when_available(self):
inst = self._bound_instance(
storage=OverwriteStyleStorage(),
name='action-buttons',
)
path = image_upload(inst, 'action-buttons.png')
self.assertEqual(path, 'image-attachments/rack_12_action-buttons.png')
def test_image_upload_uses_base_collision_handling_with_overwrite_style_storage(self):
inst = self._bound_instance(
storage=OverwriteStyleStorage(existing_names={'image-attachments/rack_12_action-buttons.png'}),
name='action-buttons',
)
path = image_upload(inst, 'action-buttons.png')
self.assertEqual(
path,
'image-attachments/rack_12_action-buttons_sdmmer4.png',
)
def test_image_field_generate_filename_uses_image_upload_collision_handling(self):
field = ImageAttachment._meta.get_field('image')
instance = ImageAttachment(
object_type=self.ct_rack,
object_id=12,
)
with patch.object(
field,
'storage',
OverwriteStyleStorage(existing_names={'image-attachments/rack_12_action-buttons.png'}),
):
path = field.generate_filename(instance, 'action-buttons.png')
self.assertEqual(
path,
'image-attachments/rack_12_action-buttons_sdmmer4.png',
)

View File

@@ -924,7 +924,14 @@ class ScriptValidationErrorTest(TestCase):
@classmethod
def setUpTestData(cls):
module = ScriptModule.objects.create(file_root=ManagedFileRootPathChoices.SCRIPTS, file_path='test_script.py')
# Avoid trying to import a non-existent on-disk module during setup.
# This test creates the Script row explicitly and monkey-patches
# Script.python_class below.
with patch.object(ScriptModule, 'sync_classes'):
module = ScriptModule.objects.create(
file_root=ManagedFileRootPathChoices.SCRIPTS,
file_path='test_script.py',
)
cls.script = Script.objects.create(module=module, name='Test script', is_executable=True)
def setUp(self):
@@ -986,7 +993,14 @@ class ScriptDefaultValuesTest(TestCase):
@classmethod
def setUpTestData(cls):
module = ScriptModule.objects.create(file_root=ManagedFileRootPathChoices.SCRIPTS, file_path='test_script.py')
# Avoid trying to import a non-existent on-disk module during setup.
# This test creates the Script row explicitly and monkey-patches
# Script.python_class below.
with patch.object(ScriptModule, 'sync_classes'):
module = ScriptModule.objects.create(
file_root=ManagedFileRootPathChoices.SCRIPTS,
file_path='test_script.py',
)
cls.script = Script.objects.create(module=module, name='Test script', is_executable=True)
def setUp(self):

View File

@@ -2,7 +2,7 @@ import importlib
from pathlib import Path
from django.core.exceptions import ImproperlyConfigured, SuspiciousFileOperation
from django.core.files.storage import default_storage
from django.core.files.storage import Storage, default_storage
from django.core.files.utils import validate_file_name
from django.db import models
from django.db.models import Q
@@ -67,15 +67,13 @@ def is_taggable(obj):
return False
def image_upload(instance, filename):
def _build_image_attachment_path(instance, filename, *, storage=default_storage):
"""
Return a path for uploading image attachments.
Build a deterministic relative path for an image attachment.
- Normalizes browser paths (e.g., C:\\fake_path\\photo.jpg)
- Uses the instance.name if provided (sanitized to a *basename*, no ext)
- Prefixes with a machine-friendly identifier
Note: Relies on Django's default_storage utility.
"""
upload_dir = 'image-attachments'
default_filename = 'unnamed'
@@ -92,22 +90,38 @@ def image_upload(instance, filename):
# Rely on Django's get_valid_filename to perform sanitization.
stem = (instance.name or file_path.stem).strip()
try:
safe_stem = default_storage.get_valid_name(stem)
safe_stem = storage.get_valid_name(stem)
except SuspiciousFileOperation:
safe_stem = default_filename
# Append the uploaded extension only if it's an allowed image type
final_name = f"{safe_stem}.{ext}" if ext in allowed_img_extensions else safe_stem
final_name = f'{safe_stem}.{ext}' if ext in allowed_img_extensions else safe_stem
# Create a machine-friendly prefix from the instance
prefix = f"{instance.object_type.model}_{instance.object_id}"
name_with_path = f"{upload_dir}/{prefix}_{final_name}"
prefix = f'{instance.object_type.model}_{instance.object_id}'
name_with_path = f'{upload_dir}/{prefix}_{final_name}'
# Validate the generated relative path (blocks absolute/traversal)
validate_file_name(name_with_path, allow_relative_path=True)
return name_with_path
def image_upload(instance, filename):
"""
Return a relative upload path for an image attachment, applying Django's
usual suffix-on-collision behavior regardless of storage backend.
"""
field = instance.image.field
name_with_path = _build_image_attachment_path(instance, filename, storage=field.storage)
# Intentionally call Django's base Storage implementation here. Some
# backends override get_available_name() to reuse the incoming name
# unchanged, but we want Django's normal suffix-on-collision behavior
# while still dispatching exists() / get_alternative_name() to the
# configured storage instance.
return Storage.get_available_name(field.storage, name_with_path, max_length=field.max_length)
def is_script(obj):
"""
Returns True if the object is a Script or Report.

View File

@@ -1,9 +1,10 @@
from django.test import RequestFactory, TestCase
from netaddr import IPNetwork
from ipam.models import IPAddress, IPRange, Prefix
from ipam.tables import AnnotatedIPAddressTable
from ipam.models import FHRPGroupAssignment, IPAddress, IPRange, Prefix
from ipam.tables import *
from ipam.utils import annotate_ip_space
from utilities.testing import TableTestCases
class AnnotatedIPAddressTableTest(TestCase):
@@ -168,3 +169,85 @@ class AnnotatedIPAddressTableTest(TestCase):
# Pools are fully usable
self.assertEqual(available.first_ip, '2001:db8:1::/126')
self.assertEqual(available.size, 4)
#
# Table ordering tests
#
class VRFTableTest(TableTestCases.StandardTableTestCase):
table = VRFTable
class RouteTargetTableTest(TableTestCases.StandardTableTestCase):
table = RouteTargetTable
class RIRTableTest(TableTestCases.StandardTableTestCase):
table = RIRTable
class AggregateTableTest(TableTestCases.StandardTableTestCase):
table = AggregateTable
class RoleTableTest(TableTestCases.StandardTableTestCase):
table = RoleTable
class PrefixTableTest(TableTestCases.StandardTableTestCase):
table = PrefixTable
class IPRangeTableTest(TableTestCases.StandardTableTestCase):
table = IPRangeTable
class IPAddressTableTest(TableTestCases.StandardTableTestCase):
table = IPAddressTable
class FHRPGroupTableTest(TableTestCases.StandardTableTestCase):
table = FHRPGroupTable
class FHRPGroupAssignmentTableTest(TableTestCases.StandardTableTestCase):
table = FHRPGroupAssignmentTable
# No ObjectListView exists for this table; it is only rendered inline on
# the FHRPGroup detail view. Provide an explicit queryset source.
queryset_sources = [
('FHRPGroupAssignment.objects.all()', FHRPGroupAssignment.objects.all()),
]
class VLANGroupTableTest(TableTestCases.StandardTableTestCase):
table = VLANGroupTable
class VLANTableTest(TableTestCases.StandardTableTestCase):
table = VLANTable
class VLANTranslationPolicyTableTest(TableTestCases.StandardTableTestCase):
table = VLANTranslationPolicyTable
class VLANTranslationRuleTableTest(TableTestCases.StandardTableTestCase):
table = VLANTranslationRuleTable
class ASNRangeTableTest(TableTestCases.StandardTableTestCase):
table = ASNRangeTable
class ASNTableTest(TableTestCases.StandardTableTestCase):
table = ASNTable
class ServiceTemplateTableTest(TableTestCases.StandardTableTestCase):
table = ServiceTemplateTable
class ServiceTableTest(TableTestCases.StandardTableTestCase):
table = ServiceTable

View File

@@ -95,9 +95,6 @@ class ValidatedModelSerializer(BaseModelSerializer):
attrs = data.copy()
# Remove custom field data (if any) prior to model validation
attrs.pop('custom_fields', None)
# Skip ManyToManyFields
opts = self.Meta.model._meta
m2m_values = {}
@@ -116,4 +113,8 @@ class ValidatedModelSerializer(BaseModelSerializer):
# Skip uniqueness validation of individual fields inside `full_clean()` (this is handled by the serializer)
instance.full_clean(validate_unique=False)
# Preserve any normalization performed by model.clean() (e.g. stale custom field pruning)
if 'custom_field_data' in attrs:
data['custom_field_data'] = instance.custom_field_data
return data

View File

@@ -467,7 +467,7 @@ class JobsMixin(models.Model):
"""
Return a list of the most recent jobs for this instance.
"""
return self.jobs.filter(status__in=JobStatusChoices.TERMINAL_STATE_CHOICES).order_by('-created').defer('data')
return self.jobs.filter(status__in=JobStatusChoices.TERMINAL_STATE_CHOICES).order_by('-started').defer('data')
class JournalingMixin(models.Model):

View File

@@ -2,3 +2,10 @@ from django.dispatch import Signal
# Signals that a model has completed its clean() method
post_clean = Signal()
# Sent after objects of a given model are created via raw save.
# Expected call signature: post_raw_create.send(sender=MyModel, pks=[...])
# Provides: pks (list) - PKs of the newly created objects.
# Callers must ensure all related objects (e.g. M2M, dependent rows) are in place
# before sending, as receivers may query related data to perform post-create work.
post_raw_create = Signal()

View File

@@ -256,13 +256,15 @@ class RelatedObjectAttr(ObjectAttribute):
linkify (bool): If True, the rendered value will be hyperlinked to the related object's detail view
grouped_by (str): A second-order object to annotate alongside the related object; for example, an attribute
representing the dcim.Site model might specify grouped_by="region"
colored (bool): If True, render the object as a colored badge when it exposes a `color` attribute
"""
template_name = 'ui/attrs/object.html'
def __init__(self, *args, linkify=None, grouped_by=None, **kwargs):
def __init__(self, *args, linkify=None, grouped_by=None, colored=False, **kwargs):
super().__init__(*args, **kwargs)
self.linkify = linkify
self.grouped_by = grouped_by
self.colored = colored
def get_context(self, obj, context):
value = self.get_value(obj)
@@ -270,6 +272,7 @@ class RelatedObjectAttr(ObjectAttribute):
return {
'linkify': self.linkify,
'group': group,
'colored': self.colored,
}
@@ -327,6 +330,7 @@ class RelatedObjectListAttr(RelatedObjectAttr):
return {
'linkify': self.linkify,
'colored': self.colored,
'items': [
{
'value': item,
@@ -358,13 +362,15 @@ class NestedObjectAttr(ObjectAttribute):
Parameters:
linkify (bool): If True, the rendered value will be hyperlinked to the related object's detail view
max_depth (int): Maximum number of ancestors to display (default: all)
colored (bool): If True, render the object as a colored badge when it exposes a `color` attribute
"""
template_name = 'ui/attrs/nested_object.html'
def __init__(self, *args, linkify=None, max_depth=None, **kwargs):
def __init__(self, *args, linkify=None, max_depth=None, colored=False, **kwargs):
super().__init__(*args, **kwargs)
self.linkify = linkify
self.max_depth = max_depth
self.colored = colored
def get_context(self, obj, context):
value = self.get_value(obj)
@@ -374,6 +380,7 @@ class NestedObjectAttr(ObjectAttribute):
return {
'nodes': nodes,
'linkify': self.linkify,
'colored': self.colored,
}

View File

@@ -11,7 +11,7 @@
<h2 class="card-header" id="module{{ module.pk }}">
<i class="mdi mdi-file-document-outline"></i> {{ module }}
<div class="card-actions">
{% if perms.extras.edit_scriptmodule %}
{% if perms.extras.change_scriptmodule %}
<a href="{% url 'extras:scriptmodule_edit' pk=module.pk %}" class="btn btn-ghost-warning btn-sm">
<i class="mdi mdi-pencil" aria-hidden="true"></i> {% trans "Edit" %}
</a>
@@ -54,7 +54,7 @@
<td>{{ script.python_class.description|markdown|placeholder }}</td>
{% if last_job %}
<td>
<a href="{% url 'extras:script_result' job_pk=last_job.pk %}">{{ last_job.created|isodatetime }}</a>
<a href="{% url 'extras:script_result' job_pk=last_job.pk %}">{{ last_job.started|isodatetime }}</a>
</td>
<td>
{% badge last_job.get_status_display last_job.get_status_color %}

View File

@@ -1,7 +1,15 @@
<ol class="breadcrumb" aria-label="breadcrumbs">
{% for node in nodes %}
<li class="breadcrumb-item">
{% if linkify %}
{% if forloop.last and colored and node.color %}
{% if linkify %}
{% with badge_url=node.get_absolute_url %}
{% badge node hex_color=node.color url=badge_url %}
{% endwith %}
{% else %}
{% badge node hex_color=node.color %}
{% endif %}
{% elif linkify %}
<a href="{{ node.get_absolute_url }}">{{ node }}</a>
{% else %}
{{ node }}

View File

@@ -5,10 +5,34 @@
{% if linkify %}{{ group|linkify }}{% else %}{{ group }}{% endif %}
</li>
<li class="breadcrumb-item">
{% if linkify %}{{ value|linkify }}{% else %}{{ value }}{% endif %}
{% if colored and value.color %}
{% if linkify %}
{% with badge_url=value.get_absolute_url %}
{% badge value hex_color=value.color url=badge_url %}
{% endwith %}
{% else %}
{% badge value hex_color=value.color %}
{% endif %}
{% elif linkify %}
{{ value|linkify }}
{% else %}
{{ value }}
{% endif %}
</li>
</ol>
{% else %}
{# Display only the object #}
{% if linkify %}{{ value|linkify }}{% else %}{{ value }}{% endif %}
{% if colored and value.color %}
{% if linkify %}
{% with badge_url=value.get_absolute_url %}
{% badge value hex_color=value.color url=badge_url %}
{% endwith %}
{% else %}
{% badge value hex_color=value.color %}
{% endif %}
{% elif linkify %}
{{ value|linkify }}
{% else %}
{{ value }}
{% endif %}
{% endif %}

View File

@@ -1,7 +1,7 @@
<ul class="list-unstyled mb-0">
{% for item in items %}
<li>
{% include "ui/attrs/object.html" with value=item.value group=item.group linkify=linkify only %}
{% include "ui/attrs/object.html" with value=item.value group=item.group linkify=linkify colored=colored only %}
</li>
{% endfor %}
{% if overflow_indicator %}

View File

@@ -0,0 +1,26 @@
from tenancy.tables import *
from utilities.testing import TableTestCases
class TenantGroupTableTest(TableTestCases.StandardTableTestCase):
table = TenantGroupTable
class TenantTableTest(TableTestCases.StandardTableTestCase):
table = TenantTable
class ContactGroupTableTest(TableTestCases.StandardTableTestCase):
table = ContactGroupTable
class ContactRoleTableTest(TableTestCases.StandardTableTestCase):
table = ContactRoleTable
class ContactTableTest(TableTestCases.StandardTableTestCase):
table = ContactTable
class ContactAssignmentTableTest(TableTestCases.StandardTableTestCase):
table = ContactAssignmentTable

File diff suppressed because it is too large Load Diff

View File

@@ -1,24 +1,26 @@
from django.test import RequestFactory, TestCase, tag
from users.models import Token
from users.tables import TokenTable
from users.tables import *
from utilities.testing import TableTestCases
class TokenTableTest(TestCase):
@tag('regression')
def test_every_orderable_field_does_not_throw_exception(self):
tokens = Token.objects.all()
disallowed = {'actions'}
class TokenTableTest(TableTestCases.StandardTableTestCase):
table = TokenTable
orderable_columns = [
column.name for column in TokenTable(tokens).columns
if column.orderable and column.name not in disallowed
]
fake_request = RequestFactory().get("/")
for col in orderable_columns:
for direction in ('-', ''):
with self.subTest(col=col, direction=direction):
table = TokenTable(tokens)
table.order_by = f'{direction}{col}'
table.as_html(fake_request)
class UserTableTest(TableTestCases.StandardTableTestCase):
table = UserTable
class GroupTableTest(TableTestCases.StandardTableTestCase):
table = GroupTable
class ObjectPermissionTableTest(TableTestCases.StandardTableTestCase):
table = ObjectPermissionTable
class OwnerGroupTableTest(TableTestCases.StandardTableTestCase):
table = OwnerGroupTable
class OwnerTableTest(TableTestCases.StandardTableTestCase):
table = OwnerTable

View File

@@ -6,7 +6,6 @@ from django.core.exceptions import ObjectDoesNotExist
from django.db import models
from django.db.models.fields.mixins import FieldCacheMixin
from django.utils.functional import cached_property
from django.utils.safestring import mark_safe
from django.utils.translation import gettext_lazy as _
from .forms.widgets import ColorSelect
@@ -31,7 +30,6 @@ class ColorField(models.CharField):
def formfield(self, **kwargs):
kwargs['widget'] = ColorSelect
kwargs['help_text'] = mark_safe(_('RGB color in hexadecimal. Example: ') + '<code>00ff00</code>')
return super().formfield(**kwargs)

View File

@@ -7,9 +7,12 @@ from django_filters.constants import EMPTY_VALUES
from drf_spectacular.types import OpenApiTypes
from drf_spectacular.utils import extend_schema_field
from .forms.fields import BigIntegerField
__all__ = (
'ContentTypeFilter',
'MultiValueArrayFilter',
'MultiValueBigNumberFilter',
'MultiValueCharFilter',
'MultiValueContentTypeFilter',
'MultiValueDateFilter',
@@ -77,6 +80,11 @@ class MultiValueNumberFilter(django_filters.MultipleChoiceFilter):
field_class = multivalue_field_factory(forms.IntegerField)
@extend_schema_field(OpenApiTypes.INT64)
class MultiValueBigNumberFilter(MultiValueNumberFilter):
field_class = multivalue_field_factory(BigIntegerField)
@extend_schema_field(OpenApiTypes.DECIMAL)
class MultiValueDecimalFilter(django_filters.MultipleChoiceFilter):
field_class = multivalue_field_factory(forms.DecimalField)

View File

@@ -2,6 +2,7 @@ import json
from django import forms
from django.conf import settings
from django.db.models import BigIntegerField as BigIntegerModelField
from django.db.models import Count
from django.forms.fields import InvalidJSONInput
from django.forms.fields import JSONField as _JSONField
@@ -13,17 +14,39 @@ from utilities.forms import widgets
from utilities.validators import EnhancedURLValidator
__all__ = (
'BigIntegerField',
'ColorField',
'CommentField',
'JSONField',
'LaxURLField',
'MACAddressField',
'PositiveBigIntegerField',
'QueryField',
'SlugField',
'TagFilterField',
)
class BigIntegerField(forms.IntegerField):
"""
An IntegerField constrained to the range of a signed 64-bit integer.
"""
def __init__(self, *args, **kwargs):
kwargs.setdefault('min_value', -BigIntegerModelField.MAX_BIGINT - 1)
kwargs.setdefault('max_value', BigIntegerModelField.MAX_BIGINT)
super().__init__(*args, **kwargs)
class PositiveBigIntegerField(BigIntegerField):
"""
An IntegerField constrained to the range supported by Django's
PositiveBigIntegerField model field.
"""
def __init__(self, *args, **kwargs):
kwargs.setdefault('min_value', 0)
super().__init__(*args, **kwargs)
class QueryField(forms.CharField):
"""
A CharField subclass used for global search/query fields in filter forms.

View File

@@ -1 +1,5 @@
{% if value or show_empty %}<span class="badge text-bg-{{ bg_color }}">{{ value }}</span>{% endif %}
{% load helpers %}
{% if value or show_empty %}
{% if url %}<a href="{{ url }}">{% endif %}<span class="badge{% if not hex_color %} text-bg-{{ bg_color }}{% endif %}"{% if hex_color %} style="color: {{ hex_color|fgcolor }}; background-color: #{{ hex_color }}"{% endif %}>{{ value }}</span>{% if url %}</a>{% endif %}
{% endif %}

View File

@@ -58,18 +58,22 @@ def customfield_value(customfield, value):
@register.inclusion_tag('builtins/badge.html')
def badge(value, bg_color=None, show_empty=False):
def badge(value, bg_color=None, hex_color=None, url=None, show_empty=False):
"""
Display the specified number as a badge.
Display the specified value as a badge.
Args:
value: The value to be displayed within the badge
bg_color: Background color CSS name
hex_color: Background color in hexadecimal RRGGBB format
url: If provided, wrap the badge in a hyperlink
show_empty: If true, display the badge even if value is None or zero
"""
return {
'value': value,
'bg_color': bg_color or 'secondary',
'hex_color': hex_color.lstrip('#') if hex_color else None,
'url': url,
'show_empty': show_empty,
}

View File

@@ -186,26 +186,52 @@ def action_url(parser, token):
return ActionURLNode(model, action, kwargs, asvar)
def _format_speed(speed, divisor, unit):
"""
Format a speed value with a given divisor and unit.
Handles decimal values and strips trailing zeros for clean output.
"""
whole, remainder = divmod(speed, divisor)
if remainder == 0:
return f'{whole} {unit}'
# Divisors are powers of 10, so len(str(divisor)) - 1 matches the decimal precision.
precision = len(str(divisor)) - 1
fraction = f'{remainder:0{precision}d}'.rstrip('0')
return f'{whole}.{fraction} {unit}'
@register.filter()
def humanize_speed(speed):
"""
Humanize speeds given in Kbps. Examples:
Humanize speeds given in Kbps, always using the largest appropriate unit.
1544 => "1.544 Mbps"
100000 => "100 Mbps"
10000000 => "10 Gbps"
Decimal values are displayed when the result is not a whole number;
trailing zeros after the decimal point are stripped for clean output.
Examples:
1_544 => "1.544 Mbps"
100_000 => "100 Mbps"
1_000_000 => "1 Gbps"
2_500_000 => "2.5 Gbps"
10_000_000 => "10 Gbps"
800_000_000 => "800 Gbps"
1_600_000_000 => "1.6 Tbps"
"""
if not speed:
return ''
if speed >= 1000000000 and speed % 1000000000 == 0:
return '{} Tbps'.format(int(speed / 1000000000))
if speed >= 1000000 and speed % 1000000 == 0:
return '{} Gbps'.format(int(speed / 1000000))
if speed >= 1000 and speed % 1000 == 0:
return '{} Mbps'.format(int(speed / 1000))
if speed >= 1000:
return '{} Mbps'.format(float(speed) / 1000)
return '{} Kbps'.format(speed)
speed = int(speed)
if speed >= 1_000_000_000:
return _format_speed(speed, 1_000_000_000, 'Tbps')
if speed >= 1_000_000:
return _format_speed(speed, 1_000_000, 'Gbps')
if speed >= 1_000:
return _format_speed(speed, 1_000, 'Mbps')
return f'{speed} Kbps'
def _humanize_capacity(value, divisor=1000):

View File

@@ -1,5 +1,6 @@
from .api import *
from .base import *
from .filtersets import *
from .tables import *
from .utils import *
from .views import *

View File

@@ -0,0 +1,157 @@
import inspect
from importlib import import_module
from django.test import RequestFactory
from netbox.views import generic
from .base import TestCase
__all__ = (
"ModelTableTestCase",
"TableTestCases",
)
class ModelTableTestCase(TestCase):
"""
Shared helpers for model-backed table ordering smoke tests.
Concrete subclasses should set `table` and may override `queryset_sources`,
`queryset_source_view_classes`, or `excluded_orderable_columns` as needed.
"""
table = None
excluded_orderable_columns = frozenset({"actions"})
# Optional explicit override for odd cases
queryset_sources = None
# Only these view types are considered sortable queryset sources by default
queryset_source_view_classes = (generic.ObjectListView,)
@classmethod
def validate_table_test_case(cls):
"""
Assert that the test case is correctly configured with a model-backed table.
Raises:
AssertionError: If ``table`` is not set or is not backed by a Django model.
"""
if cls.table is None:
raise AssertionError(f"{cls.__name__} must define `table`")
if getattr(cls.table._meta, "model", None) is None:
raise AssertionError(f"{cls.__name__}.table must be model-backed")
def get_request(self):
"""
Build a minimal ``GET`` request authenticated as the test user.
"""
request = RequestFactory().get("/")
request.user = self.user
return request
def get_table(self, queryset):
"""
Instantiate the table class under test with the given *queryset*.
"""
return self.table(queryset)
@classmethod
def is_queryset_source_view(cls, view):
"""
Return ``True`` if *view* is a list-style view class that declares
this test case's table and exposes a usable queryset.
"""
model = cls.table._meta.model
app_label = model._meta.app_label
return (
inspect.isclass(view)
and view.__module__.startswith(f"{app_label}.views")
and getattr(view, "table", None) is cls.table
and getattr(view, "queryset", None) is not None
and issubclass(view, cls.queryset_source_view_classes)
)
@classmethod
def get_queryset_sources(cls):
"""
Return iterable of (label, queryset) pairs to test.
The label is included in the subtest failure output.
By default, only discover list-style views that declare this table.
That keeps bulk edit/delete confirmation tables out of the ordering
smoke test.
"""
if cls.queryset_sources is not None:
return tuple(cls.queryset_sources)
model = cls.table._meta.model
app_label = model._meta.app_label
module = import_module(f"{app_label}.views")
sources = []
for _, view in inspect.getmembers(module, inspect.isclass):
if not cls.is_queryset_source_view(view):
continue
queryset = view.queryset
if hasattr(queryset, "all"):
queryset = queryset.all()
sources.append((view.__name__, queryset))
if not sources:
raise AssertionError(
f"{cls.__name__} could not find any list-style queryset source for "
f"{cls.table.__module__}.{cls.table.__name__}; "
"set `queryset_sources` explicitly if needed."
)
return tuple(sources)
def iter_orderable_columns(self, queryset):
"""
Yield the names of all orderable columns for *queryset*, excluding
any listed in ``excluded_orderable_columns``.
"""
for column in self.get_table(queryset).columns:
if not column.orderable:
continue
if column.name in self.excluded_orderable_columns:
continue
yield column.name
class TableTestCases:
"""
Keep test_* methods nested to avoid unittest auto-discovering the reusable
base classes directly.
"""
class StandardTableTestCase(ModelTableTestCase):
@classmethod
def setUpClass(cls):
super().setUpClass()
cls.validate_table_test_case()
def test_every_orderable_column_renders(self):
"""
Verify that each declared ordering can be applied without error.
This is intentionally a smoke test. It validates ordering against
the configured queryset sources but does not create model
instances by default, so it complements rather than replaces
data-backed rendering tests for tables whose behavior depends on
populated querysets.
"""
request = self.get_request()
for source_name, queryset in self.get_queryset_sources():
for column_name in self.iter_orderable_columns(queryset):
for direction, prefix in (("asc", ""), ("desc", "-")):
with self.subTest(source=source_name, column=column_name, direction=direction):
table = self.get_table(queryset)
table.order_by = f"{prefix}{column_name}"
table.as_html(request)

View File

@@ -1,9 +1,10 @@
from unittest.mock import patch
from django.template.loader import render_to_string
from django.test import TestCase, override_settings
from utilities.templatetags.builtins.tags import static_with_params
from utilities.templatetags.helpers import _humanize_capacity
from utilities.templatetags.builtins.tags import badge, static_with_params
from utilities.templatetags.helpers import _humanize_capacity, humanize_speed
class StaticWithParamsTest(TestCase):
@@ -49,6 +50,20 @@ class StaticWithParamsTest(TestCase):
self.assertNotIn('v=old_version', result)
class BadgeTest(TestCase):
"""
Test the badge template tag functionality.
"""
def test_badge_with_hex_color_and_url(self):
html = render_to_string('builtins/badge.html', badge('Role', hex_color='ff0000', url='/dcim/device-roles/1/'))
self.assertIn('href="/dcim/device-roles/1/"', html)
self.assertIn('background-color: #ff0000', html)
self.assertIn('color: #ffffff', html)
self.assertIn('>Role<', html)
class HumanizeCapacityTest(TestCase):
"""
Test the _humanize_capacity function for correct SI/IEC unit label selection.
@@ -90,3 +105,87 @@ class HumanizeCapacityTest(TestCase):
def test_default_divisor_is_1000(self):
self.assertEqual(_humanize_capacity(2000), '2.00 GB')
class HumanizeSpeedTest(TestCase):
"""
Test the humanize_speed filter for correct unit selection and decimal formatting.
"""
# Falsy / empty inputs
def test_none(self):
self.assertEqual(humanize_speed(None), '')
def test_zero(self):
self.assertEqual(humanize_speed(0), '')
def test_empty_string(self):
self.assertEqual(humanize_speed(''), '')
# Kbps (below 1000)
def test_kbps(self):
self.assertEqual(humanize_speed(100), '100 Kbps')
def test_kbps_low(self):
self.assertEqual(humanize_speed(1), '1 Kbps')
# Mbps (1,000 999,999)
def test_mbps_whole(self):
self.assertEqual(humanize_speed(100_000), '100 Mbps')
def test_mbps_decimal(self):
self.assertEqual(humanize_speed(1_544), '1.544 Mbps')
def test_mbps_10(self):
self.assertEqual(humanize_speed(10_000), '10 Mbps')
# Gbps (1,000,000 999,999,999)
def test_gbps_whole(self):
self.assertEqual(humanize_speed(1_000_000), '1 Gbps')
def test_gbps_decimal(self):
self.assertEqual(humanize_speed(2_500_000), '2.5 Gbps')
def test_gbps_10(self):
self.assertEqual(humanize_speed(10_000_000), '10 Gbps')
def test_gbps_25(self):
self.assertEqual(humanize_speed(25_000_000), '25 Gbps')
def test_gbps_40(self):
self.assertEqual(humanize_speed(40_000_000), '40 Gbps')
def test_gbps_100(self):
self.assertEqual(humanize_speed(100_000_000), '100 Gbps')
def test_gbps_400(self):
self.assertEqual(humanize_speed(400_000_000), '400 Gbps')
def test_gbps_800(self):
self.assertEqual(humanize_speed(800_000_000), '800 Gbps')
# Tbps (1,000,000,000+)
def test_tbps_whole(self):
self.assertEqual(humanize_speed(1_000_000_000), '1 Tbps')
def test_tbps_decimal(self):
self.assertEqual(humanize_speed(1_600_000_000), '1.6 Tbps')
# Edge cases
def test_string_input(self):
"""Ensure string values are cast to int correctly."""
self.assertEqual(humanize_speed('2500000'), '2.5 Gbps')
def test_non_round_remainder_preserved(self):
"""Ensure fractional parts with interior zeros are preserved."""
self.assertEqual(humanize_speed(1_001_000), '1.001 Gbps')
def test_trailing_zeros_stripped(self):
"""Ensure trailing fractional zeros are stripped (5.500 → 5.5)."""
self.assertEqual(humanize_speed(5_500_000), '5.5 Gbps')

View File

@@ -0,0 +1,26 @@
from utilities.testing import TableTestCases
from virtualization.tables import *
class ClusterTypeTableTest(TableTestCases.StandardTableTestCase):
table = ClusterTypeTable
class ClusterGroupTableTest(TableTestCases.StandardTableTestCase):
table = ClusterGroupTable
class ClusterTableTest(TableTestCases.StandardTableTestCase):
table = ClusterTable
class VirtualMachineTableTest(TableTestCases.StandardTableTestCase):
table = VirtualMachineTable
class VMInterfaceTableTest(TableTestCases.StandardTableTestCase):
table = VMInterfaceTable
class VirtualDiskTableTest(TableTestCases.StandardTableTestCase):
table = VirtualDiskTable

View File

@@ -17,7 +17,7 @@ class VirtualMachinePanel(panels.ObjectAttributesPanel):
name = attrs.TextAttr('name')
status = attrs.ChoiceAttr('status')
start_on_boot = attrs.ChoiceAttr('start_on_boot')
role = attrs.RelatedObjectAttr('role', linkify=True)
role = attrs.RelatedObjectAttr('role', linkify=True, colored=True)
platform = attrs.NestedObjectAttr('platform', linkify=True, max_depth=3)
description = attrs.TextAttr('description')
serial = attrs.TextAttr('serial', label=_('Serial number'), style='font-monospace', copy_button=True)

View File

@@ -1,23 +1,42 @@
from django.test import RequestFactory, TestCase, tag
from vpn.models import TunnelTermination
from vpn.tables import TunnelTerminationTable
from utilities.testing import TableTestCases
from vpn.tables import *
@tag('regression')
class TunnelTerminationTableTest(TestCase):
def test_every_orderable_field_does_not_throw_exception(self):
terminations = TunnelTermination.objects.all()
fake_request = RequestFactory().get("/")
disallowed = {'actions'}
class TunnelGroupTableTest(TableTestCases.StandardTableTestCase):
table = TunnelGroupTable
orderable_columns = [
column.name for column in TunnelTerminationTable(terminations).columns
if column.orderable and column.name not in disallowed
]
for col in orderable_columns:
for dir in ('-', ''):
table = TunnelTerminationTable(terminations)
table.order_by = f'{dir}{col}'
table.as_html(fake_request)
class TunnelTableTest(TableTestCases.StandardTableTestCase):
table = TunnelTable
class TunnelTerminationTableTest(TableTestCases.StandardTableTestCase):
table = TunnelTerminationTable
class IKEProposalTableTest(TableTestCases.StandardTableTestCase):
table = IKEProposalTable
class IKEPolicyTableTest(TableTestCases.StandardTableTestCase):
table = IKEPolicyTable
class IPSecProposalTableTest(TableTestCases.StandardTableTestCase):
table = IPSecProposalTable
class IPSecPolicyTableTest(TableTestCases.StandardTableTestCase):
table = IPSecPolicyTable
class IPSecProfileTableTest(TableTestCases.StandardTableTestCase):
table = IPSecProfileTable
class L2VPNTableTest(TableTestCases.StandardTableTestCase):
table = L2VPNTable
class L2VPNTerminationTableTest(TableTestCases.StandardTableTestCase):
table = L2VPNTerminationTable

View File

@@ -0,0 +1,14 @@
from utilities.testing import TableTestCases
from wireless.tables import *
class WirelessLANGroupTableTest(TableTestCases.StandardTableTestCase):
table = WirelessLANGroupTable
class WirelessLANTableTest(TableTestCases.StandardTableTestCase):
table = WirelessLANTable
class WirelessLinkTableTest(TableTestCases.StandardTableTestCase):
table = WirelessLinkTable