Compare commits

...

11 Commits

Author SHA1 Message Date
Martin Hauser
209c60ea6e test(tables): Add reusable OrderableColumnsTestCase
Introduce `TableTestCases.OrderableColumnsTestCase`, a shared base class
that automatically discovers sortable columns from list-view querysets
and verifies each renders without exceptions in both ascending and
descending order.

Add per-table smoke tests across circuits, core, dcim, extras, ipam,
tenancy, users, virtualization, vpn, and wireless apps.

Fixes #21766
2026-04-03 15:01:57 +02:00
github-actions
f058ee3d60 Update source translation strings 2026-04-03 05:31:13 +00:00
bctiemann
49ba0dd495 Fix filtering of object-type custom fields when "is empty" is selected (#21829) 2026-04-02 16:17:49 -07:00
Martin Hauser
b4ee2cf447 fix(dcim): Refresh stale CablePath references during serialization (#21815)
Cable edits can delete and recreate CablePath rows while endpoint
instances remain in memory. Deferred event serialization can then
encounter a stale `_path` reference and raise `CablePath.DoesNotExist`.

Refresh stale `_path` references through `PathEndpoint.path` and route
internal callers through that accessor. Update `EventContext` to track
the latest serialization source for coalesced duplicate enqueues, while
eagerly freezing delete-event payloads before row removal.

Also avoid mutating `event_rule.action_data` when merging the event
payload.

Fixes #21498
2026-04-02 15:49:42 -07:00
Jason Novinger
34098bb20a Fixes #21760: Add 1C2P:2C1P breakout cable profile (#21824)
* Add Breakout1C2Px2C1PCableProfile class
* Add BREAKOUT_1C2P_2C1P choice
* Add new CableProfileChoices (BREAKOUT_1C2P_2C1P)

---------

Co-authored-by: Paulo Santos <paulo.banon@gmail.com>
2026-04-02 23:33:35 +02:00
Jonathan Senecal
a19daa5466 Fixes #21095: Add IEC unit labels support and rename humanize helpers to be unit-agnostic (#21789) 2026-04-02 14:30:49 -07:00
bctiemann
40eec679d9 Fixes: #21696 - Upgrade to django-rq==4.0.1 (#21805) 2026-04-02 14:09:53 -07:00
Martin Hauser
57556e3fdb fix(tables): Correct sortable column definitions across tables
Fix broken sorting metadata caused by incorrect accessors, field
references, and naming mismatches in several table definitions.

Update accessor paths for provider_account and device order_by; add
order_by mapping for the is_active property column; correct field name
typos such as termination_count to terminations_count; rename the
ssl_validation column to ssl_verification to match the model field; and
mark computed columns as orderable=False where sorting is not supported.

Fixes #21825
2026-04-02 16:20:53 -04:00
Arthur Hanson
f2d8ae29c2 21701 Allow scripts to be uploaded via post to API (#21756)
* #21701 allow upload script via API

* #21701 allow upload script via API

* add extra test

* change to use Script api endpoint

* ruff fix

* review feedback:

* review feedback:

* review feedback:

* Fix permission check, perform_create delegation, and test mock setup

- destroy() now checks extras.delete_script (queryset is Script.objects.all())
- create() delegates to self.perform_create() instead of calling serializer.save() directly
- Add comment explaining why update/partial_update intentionally return 405
- Fix test_upload_script_module: set mock_storage.save.return_value so file_path
  receives a real string after the _save_upload return-value fix; add DB existence check

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* Return 400 instead of 500 on duplicate script module upload

Catch IntegrityError from the unique (file_root, file_path) constraint
and re-raise as a ValidationError so the API returns a 400 with a clear
message rather than a 500.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* Validate upload_file + data_source conflict for multipart requests

DRF 3.16 Serializer.get_value() uses parse_html_dict() or empty for all
HTML/multipart input. A flat key like data_source=2 produces an empty
dict ({}), which is falsy, so it falls back to empty and the nested
field is silently skipped. data.get('data_source') is therefore always
None in multipart requests, bypassing the conflict check.

Fix: also check self.initial_data for data_source and data_file in all
three guards in validate(), so the raw submitted value is detected even
when DRF's HTML parser drops the deserialized object.

Add test_upload_with_data_source_fails to cover the multipart conflict
path explicitly.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* Require data_file when data_source is specified

data_source alone is not a valid creation payload — a data_file must
also be provided to identify which file within the source to sync.
Add the corresponding validation error and a test to cover the case.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* Align ManagedFileForm validation with API serializer rules

Add the missing checks to ManagedFileForm.clean():
- upload_file + data_source is rejected (matches API)
- data_source without data_file is rejected with a specific message
- Update the 'nothing provided' error to mention data source + data file

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* Revert "Align ManagedFileForm validation with API serializer rules"

This reverts commit f0ac7c3bd2.

* Align API validation messages with UI; restore complete checks

- Match UI error messages for upload+data_file conflict and no-source case
- Keep API-only guards for upload+data_source and data_source-without-data_file
- Restore test_upload_with_data_source_fails

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* Run source/file conflict checks before super().validate() / full_clean()

super().validate() calls full_clean() on the model instance, which raises
a unique-constraint error for (file_root, file_path) when file_path is
empty (e.g. data_source-only requests). Move the conflict guards above the
super() call so they produce clear, actionable error messages before
full_clean() has a chance to surface confusing database-level errors.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* destroy() deletes ScriptModule, not Script

DELETE /api/extras/scripts/<pk>/ now deletes the entire ScriptModule
(matching the UI's delete view), including modules with no Script
children (e.g. sync hasn't run yet). Permission check updated to
delete_scriptmodule. The queryset restriction for destroy is removed
since the module is deleted via script.module, not super().destroy().

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* review feedback:

* cleanup

* cleanup

* cleanup

* cleanup

* change to ScriptModule

* change to ScriptModule

* change to ScriptModule

* update docs

* cleanup

* restore file

* cleanup

* cleanup

* cleanup

* cleanup

* cleanup

* keep only upload functionality

* cleanup

* cleanup

* cleanup

* change to scripts/upload api

* cleanup

* cleanup

* cleanup

* cleanup

---------

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-02 08:42:14 -04:00
github-actions
f6eb5dda0f Update source translation strings 2026-04-02 05:30:39 +00:00
Mark Robert Coleman
c7bbfb24c5 Fix single {module} token rejection at nested module bay depth (#21740)
* Fix single {module} token rejection at nested depth (#20474)

A module type with a single {module} placeholder in component template
names could not be installed in a nested module bay (depth > 1) because
the form validation required an exact match between the token count and
the tree depth. This resolves the issue by treating a single {module}
token as a reference to the immediate parent bay's position, regardless
of nesting depth. Multi-token behavior is unchanged.

Refactors resolve_name() and resolve_label() into a shared
_resolve_module_placeholder() helper to eliminate duplication.

Fixes: #20474

* Address review feedback for PR #21740 (fixes #20474)

- Rebase on latest main to resolve merge conflicts
- Extract shared module bay traversal and {module} token resolution
  into dcim/utils.py (get_module_bay_positions, resolve_module_placeholder)
- Update ModuleCommonForm, ModularComponentTemplateModel, and
  ModuleBayTemplate to use shared utility functions
- Add {module} token validation to ModuleSerializer.validate() so the
  API enforces the same rules as the UI form
- Remove duplicated _get_module_bay_tree (form) and _get_module_tree
  (model) methods in favor of the shared routine
2026-04-01 16:19:43 -07:00
58 changed files with 2232 additions and 874 deletions

View File

@@ -384,6 +384,18 @@ A calendar date. Returns a `datetime.date` object.
A complete date & time. Returns a `datetime.datetime` object.
## Uploading Scripts via the API
Script modules can be uploaded to NetBox via the REST API by sending a `multipart/form-data` POST request to `/api/extras/scripts/upload/`. The caller must have the `extras.add_scriptmodule` and `core.add_managedfile` permissions.
```no-highlight
curl -X POST \
-H "Authorization: Token $TOKEN" \
-H "Accept: application/json; indent=4" \
-F "file=@/path/to/myscript.py" \
http://netbox/api/extras/scripts/upload/
```
## Running Custom Scripts
!!! note

View File

@@ -95,6 +95,7 @@ class VirtualCircuitTerminationTable(NetBoxTable):
verbose_name=_('Provider network')
)
provider_account = tables.Column(
accessor=tables.A('virtual_circuit__provider_account'),
linkify=True,
verbose_name=_('Account')
)
@@ -112,7 +113,7 @@ class VirtualCircuitTerminationTable(NetBoxTable):
class Meta(NetBoxTable.Meta):
model = VirtualCircuitTermination
fields = (
'pk', 'id', 'virtual_circuit', 'provider', 'provider_network', 'provider_account', 'role', 'interfaces',
'pk', 'id', 'virtual_circuit', 'provider', 'provider_network', 'provider_account', 'role', 'interface',
'description', 'created', 'last_updated', 'actions',
)
default_columns = (

View File

@@ -1,48 +1,46 @@
from django.test import RequestFactory, TestCase, tag
from circuits.models import CircuitGroupAssignment, CircuitTermination
from circuits.tables import CircuitGroupAssignmentTable, CircuitTerminationTable
from circuits.tables import *
from utilities.testing import TableTestCases
@tag('regression')
class CircuitTerminationTableTest(TestCase):
def test_every_orderable_field_does_not_throw_exception(self):
terminations = CircuitTermination.objects.all()
disallowed = {
'actions',
}
orderable_columns = [
column.name
for column in CircuitTerminationTable(terminations).columns
if column.orderable and column.name not in disallowed
]
fake_request = RequestFactory().get('/')
for col in orderable_columns:
for direction in ('-', ''):
table = CircuitTerminationTable(terminations)
table.order_by = f'{direction}{col}'
table.as_html(fake_request)
class CircuitTypeTableTest(TableTestCases.OrderableColumnsTestCase):
table = CircuitTypeTable
@tag('regression')
class CircuitGroupAssignmentTableTest(TestCase):
def test_every_orderable_field_does_not_throw_exception(self):
assignment = CircuitGroupAssignment.objects.all()
disallowed = {
'actions',
}
class CircuitTableTest(TableTestCases.OrderableColumnsTestCase):
table = CircuitTable
orderable_columns = [
column.name
for column in CircuitGroupAssignmentTable(assignment).columns
if column.orderable and column.name not in disallowed
]
fake_request = RequestFactory().get('/')
for col in orderable_columns:
for direction in ('-', ''):
table = CircuitGroupAssignmentTable(assignment)
table.order_by = f'{direction}{col}'
table.as_html(fake_request)
class CircuitTerminationTableTest(TableTestCases.OrderableColumnsTestCase):
table = CircuitTerminationTable
class CircuitGroupTableTest(TableTestCases.OrderableColumnsTestCase):
table = CircuitGroupTable
class CircuitGroupAssignmentTableTest(TableTestCases.OrderableColumnsTestCase):
table = CircuitGroupAssignmentTable
class ProviderTableTest(TableTestCases.OrderableColumnsTestCase):
table = ProviderTable
class ProviderAccountTableTest(TableTestCases.OrderableColumnsTestCase):
table = ProviderAccountTable
class ProviderNetworkTableTest(TableTestCases.OrderableColumnsTestCase):
table = ProviderNetworkTable
class VirtualCircuitTypeTableTest(TableTestCases.OrderableColumnsTestCase):
table = VirtualCircuitTypeTable
class VirtualCircuitTableTest(TableTestCases.OrderableColumnsTestCase):
table = VirtualCircuitTable
class VirtualCircuitTerminationTableTest(TableTestCases.OrderableColumnsTestCase):
table = VirtualCircuitTerminationTable

View File

@@ -2,7 +2,7 @@ from django.http import Http404, HttpResponse
from django.shortcuts import get_object_or_404
from django.utils.translation import gettext_lazy as _
from django_rq.queues import get_redis_connection
from django_rq.settings import QUEUES_LIST
from django_rq.settings import get_queues_list
from django_rq.utils import get_statistics
from drf_spectacular.types import OpenApiTypes
from drf_spectacular.utils import OpenApiParameter, extend_schema
@@ -195,7 +195,7 @@ class BackgroundWorkerViewSet(BaseRQViewSet):
return 'Background Workers'
def get_data(self):
config = QUEUES_LIST[0]
config = get_queues_list()[0]
return Worker.all(get_redis_connection(config['connection_config']))
@extend_schema(
@@ -205,7 +205,7 @@ class BackgroundWorkerViewSet(BaseRQViewSet):
)
def retrieve(self, request, name):
# all the RQ queues should use the same connection
config = QUEUES_LIST[0]
config = get_queues_list()[0]
workers = Worker.all(get_redis_connection(config['connection_config']))
worker = next((item for item in workers if item.name == name), None)
if not worker:
@@ -229,7 +229,7 @@ class BackgroundTaskViewSet(BaseRQViewSet):
return get_rq_jobs()
def get_task_from_id(self, task_id):
config = QUEUES_LIST[0]
config = get_queues_list()[0]
task = RQ_Job.fetch(task_id, connection=get_redis_connection(config['connection_config']))
if not task:
raise Http404

View File

@@ -19,6 +19,7 @@ REVISION_BUTTONS = """
class ConfigRevisionTable(NetBoxTable):
is_active = columns.BooleanColumn(
verbose_name=_('Is Active'),
accessor='active',
false_mark=None
)
actions = columns.ActionsColumn(

View File

@@ -0,0 +1,26 @@
from core.models import ObjectChange
from core.tables import *
from utilities.testing import TableTestCases
class DataSourceTableTest(TableTestCases.OrderableColumnsTestCase):
table = DataSourceTable
class DataFileTableTest(TableTestCases.OrderableColumnsTestCase):
table = DataFileTable
class JobTableTest(TableTestCases.OrderableColumnsTestCase):
table = JobTable
class ObjectChangeTableTest(TableTestCases.OrderableColumnsTestCase):
table = ObjectChangeTable
queryset_sources = [
('ObjectChangeListView', ObjectChange.objects.valid_models()),
]
class ConfigRevisionTableTest(TableTestCases.OrderableColumnsTestCase):
table = ConfigRevisionTable

View File

@@ -6,7 +6,7 @@ from datetime import datetime
from django.urls import reverse
from django.utils import timezone
from django_rq import get_queue
from django_rq.settings import QUEUES_MAP
from django_rq.settings import get_queues_map
from django_rq.workers import get_worker
from rq.job import Job as RQ_Job
from rq.job import JobStatus
@@ -189,7 +189,7 @@ class BackgroundTaskTestCase(TestCase):
def test_background_tasks_list_default(self):
queue = get_queue('default')
queue.enqueue(self.dummy_job_default)
queue_index = QUEUES_MAP['default']
queue_index = get_queues_map()['default']
response = self.client.get(reverse('core:background_task_list', args=[queue_index, 'queued']))
self.assertEqual(response.status_code, 200)
@@ -198,7 +198,7 @@ class BackgroundTaskTestCase(TestCase):
def test_background_tasks_list_high(self):
queue = get_queue('high')
queue.enqueue(self.dummy_job_high)
queue_index = QUEUES_MAP['high']
queue_index = get_queues_map()['high']
response = self.client.get(reverse('core:background_task_list', args=[queue_index, 'queued']))
self.assertEqual(response.status_code, 200)
@@ -207,7 +207,7 @@ class BackgroundTaskTestCase(TestCase):
def test_background_tasks_list_finished(self):
queue = get_queue('default')
job = queue.enqueue(self.dummy_job_default)
queue_index = QUEUES_MAP['default']
queue_index = get_queues_map()['default']
registry = FinishedJobRegistry(queue.name, queue.connection)
registry.add(job, 2)
@@ -218,7 +218,7 @@ class BackgroundTaskTestCase(TestCase):
def test_background_tasks_list_failed(self):
queue = get_queue('default')
job = queue.enqueue(self.dummy_job_default)
queue_index = QUEUES_MAP['default']
queue_index = get_queues_map()['default']
registry = FailedJobRegistry(queue.name, queue.connection)
registry.add(job, 2)
@@ -229,7 +229,7 @@ class BackgroundTaskTestCase(TestCase):
def test_background_tasks_scheduled(self):
queue = get_queue('default')
queue.enqueue_at(datetime.now(), self.dummy_job_default)
queue_index = QUEUES_MAP['default']
queue_index = get_queues_map()['default']
response = self.client.get(reverse('core:background_task_list', args=[queue_index, 'scheduled']))
self.assertEqual(response.status_code, 200)
@@ -238,7 +238,7 @@ class BackgroundTaskTestCase(TestCase):
def test_background_tasks_list_deferred(self):
queue = get_queue('default')
job = queue.enqueue(self.dummy_job_default)
queue_index = QUEUES_MAP['default']
queue_index = get_queues_map()['default']
registry = DeferredJobRegistry(queue.name, queue.connection)
registry.add(job, 2)
@@ -335,7 +335,7 @@ class BackgroundTaskTestCase(TestCase):
worker2 = get_worker('high')
worker2.register_birth()
queue_index = QUEUES_MAP['default']
queue_index = get_queues_map()['default']
response = self.client.get(reverse('core:worker_list', args=[queue_index]))
self.assertEqual(response.status_code, 200)
self.assertIn(str(worker1.name), str(response.content))

View File

@@ -1,7 +1,7 @@
from django.http import Http404
from django.utils.translation import gettext_lazy as _
from django_rq.queues import get_queue, get_queue_by_index, get_redis_connection
from django_rq.settings import QUEUES_LIST, QUEUES_MAP
from django_rq.settings import get_queues_list, get_queues_map
from django_rq.utils import get_jobs, stop_jobs
from rq import requeue_job
from rq.exceptions import NoSuchJobError
@@ -31,7 +31,7 @@ def get_rq_jobs():
"""
jobs = set()
for queue in QUEUES_LIST:
for queue in get_queues_list():
queue = get_queue(queue['name'])
jobs.update(queue.get_jobs())
@@ -78,13 +78,13 @@ def delete_rq_job(job_id):
"""
Delete the specified RQ job.
"""
config = QUEUES_LIST[0]
config = get_queues_list()[0]
try:
job = RQ_Job.fetch(job_id, connection=get_redis_connection(config['connection_config']),)
except NoSuchJobError:
raise Http404(_("Job {job_id} not found").format(job_id=job_id))
queue_index = QUEUES_MAP[job.origin]
queue_index = get_queues_map()[job.origin]
queue = get_queue_by_index(queue_index)
# Remove job id from queue and delete the actual job
@@ -96,13 +96,13 @@ def requeue_rq_job(job_id):
"""
Requeue the specified RQ job.
"""
config = QUEUES_LIST[0]
config = get_queues_list()[0]
try:
job = RQ_Job.fetch(job_id, connection=get_redis_connection(config['connection_config']),)
except NoSuchJobError:
raise Http404(_("Job {id} not found.").format(id=job_id))
queue_index = QUEUES_MAP[job.origin]
queue_index = get_queues_map()[job.origin]
queue = get_queue_by_index(queue_index)
requeue_job(job_id, connection=queue.connection, serializer=queue.serializer)
@@ -112,13 +112,13 @@ def enqueue_rq_job(job_id):
"""
Enqueue the specified RQ job.
"""
config = QUEUES_LIST[0]
config = get_queues_list()[0]
try:
job = RQ_Job.fetch(job_id, connection=get_redis_connection(config['connection_config']),)
except NoSuchJobError:
raise Http404(_("Job {id} not found.").format(id=job_id))
queue_index = QUEUES_MAP[job.origin]
queue_index = get_queues_map()[job.origin]
queue = get_queue_by_index(queue_index)
try:
@@ -144,13 +144,13 @@ def stop_rq_job(job_id):
"""
Stop the specified RQ job.
"""
config = QUEUES_LIST[0]
config = get_queues_list()[0]
try:
job = RQ_Job.fetch(job_id, connection=get_redis_connection(config['connection_config']),)
except NoSuchJobError:
raise Http404(_("Job {job_id} not found").format(job_id=job_id))
queue_index = QUEUES_MAP[job.origin]
queue_index = get_queues_map()[job.origin]
queue = get_queue_by_index(queue_index)
return stop_jobs(queue, job_id)[0]

View File

@@ -14,7 +14,7 @@ from django.urls import reverse
from django.utils.translation import gettext_lazy as _
from django.views.generic import View
from django_rq.queues import get_connection, get_queue_by_index, get_redis_connection
from django_rq.settings import QUEUES_LIST, QUEUES_MAP
from django_rq.settings import get_queues_list, get_queues_map
from django_rq.utils import get_statistics
from rq.exceptions import NoSuchJobError
from rq.job import Job as RQ_Job
@@ -524,13 +524,13 @@ class BackgroundTaskView(BaseRQView):
def get(self, request, job_id):
# all the RQ queues should use the same connection
config = QUEUES_LIST[0]
config = get_queues_list()[0]
try:
job = RQ_Job.fetch(job_id, connection=get_redis_connection(config['connection_config']),)
except NoSuchJobError:
raise Http404(_("Job {job_id} not found").format(job_id=job_id))
queue_index = QUEUES_MAP[job.origin]
queue_index = get_queues_map()[job.origin]
queue = get_queue_by_index(queue_index)
try:
@@ -640,7 +640,7 @@ class WorkerView(BaseRQView):
def get(self, request, key):
# all the RQ queues should use the same connection
config = QUEUES_LIST[0]
config = get_queues_list()[0]
worker = Worker.find_by_key('rq:worker:' + key, connection=get_redis_connection(config['connection_config']))
# Convert microseconds to milliseconds
worker.total_working_time = worker.total_working_time / 1000

View File

@@ -38,7 +38,15 @@ class ConnectedEndpointsSerializer(serializers.ModelSerializer):
@extend_schema_field(serializers.BooleanField)
def get_connected_endpoints_reachable(self, obj):
return obj._path and obj._path.is_complete and obj._path.is_active
"""
Return whether the connected endpoints are reachable via a complete, active cable path.
"""
# Use the public `path` accessor rather than dereferencing `_path`
# directly. `path` already handles the stale in-memory relation case
# that can occur while CablePath rows are rebuilt during cable edits.
if path := obj.path:
return path.is_complete and path.is_active
return False
class PortSerializer(serializers.ModelSerializer):

View File

@@ -6,8 +6,9 @@ from drf_spectacular.utils import extend_schema_field
from rest_framework import serializers
from dcim.choices import *
from dcim.constants import MACADDRESS_ASSIGNMENT_MODELS
from dcim.constants import MACADDRESS_ASSIGNMENT_MODELS, MODULE_TOKEN
from dcim.models import Device, DeviceBay, MACAddress, Module, VirtualDeviceContext
from dcim.utils import get_module_bay_positions, resolve_module_placeholder
from extras.api.serializers_.configtemplates import ConfigTemplateSerializer
from ipam.api.serializers_.ip import IPAddressSerializer
from netbox.api.fields import ChoiceField, ContentTypeField, RelatedObjectCountField
@@ -159,6 +160,60 @@ class ModuleSerializer(PrimaryModelSerializer):
]
brief_fields = ('id', 'url', 'display', 'device', 'module_bay', 'module_type', 'description')
def validate(self, data):
data = super().validate(data)
if self.nested:
return data
# Skip validation for existing modules (updates)
if self.instance is not None:
return data
module_bay = data.get('module_bay')
module_type = data.get('module_type')
device = data.get('device')
if not all((module_bay, module_type, device)):
return data
positions = get_module_bay_positions(module_bay)
for templates, component_attribute in [
("consoleporttemplates", "consoleports"),
("consoleserverporttemplates", "consoleserverports"),
("interfacetemplates", "interfaces"),
("powerporttemplates", "powerports"),
("poweroutlettemplates", "poweroutlets"),
("rearporttemplates", "rearports"),
("frontporttemplates", "frontports"),
]:
installed_components = {
component.name: component for component in getattr(device, component_attribute).all()
}
for template in getattr(module_type, templates).all():
resolved_name = template.name
if MODULE_TOKEN in template.name:
if not module_bay.position:
raise serializers.ValidationError(
_("Cannot install module with placeholder values in a module bay with no position defined.")
)
try:
resolved_name = resolve_module_placeholder(template.name, positions)
except ValueError as e:
raise serializers.ValidationError(str(e))
if resolved_name in installed_components:
raise serializers.ValidationError(
_("A {model} named {name} already exists").format(
model=template.component_model.__name__,
name=resolved_name
)
)
return data
class MACAddressSerializer(PrimaryModelSerializer):
assigned_object_type = ContentTypeField(

View File

@@ -254,6 +254,21 @@ class Trunk8C4PCableProfile(BaseCableProfile):
b_connectors = a_connectors
class Breakout1C2Px2C1PCableProfile(BaseCableProfile):
a_connectors = {
1: 2,
}
b_connectors = {
1: 1,
2: 1,
}
_mapping = {
(1, 1): (1, 1),
(1, 2): (2, 1),
(2, 1): (1, 2),
}
class Breakout1C4Px4C1PCableProfile(BaseCableProfile):
a_connectors = {
1: 4,

View File

@@ -1776,6 +1776,7 @@ class CableProfileChoices(ChoiceSet):
TRUNK_4C8P = 'trunk-4c8p'
TRUNK_8C4P = 'trunk-8c4p'
# Breakouts
BREAKOUT_1C2P_2C1P = 'breakout-1c2p-2c1p'
BREAKOUT_1C4P_4C1P = 'breakout-1c4p-4c1p'
BREAKOUT_1C6P_6C1P = 'breakout-1c6p-6c1p'
BREAKOUT_2C4P_8C1P_SHUFFLE = 'breakout-2c4p-8c1p-shuffle'
@@ -1815,6 +1816,7 @@ class CableProfileChoices(ChoiceSet):
(
_('Breakout'),
(
(BREAKOUT_1C2P_2C1P, _('1C2P:2C1P breakout')),
(BREAKOUT_1C4P_4C1P, _('1C4P:4C1P breakout')),
(BREAKOUT_1C6P_6C1P, _('1C6P:6C1P breakout')),
(BREAKOUT_2C4P_8C1P_SHUFFLE, _('2C4P:8C1P breakout (shuffle)')),

View File

@@ -3,6 +3,7 @@ from django.utils.translation import gettext_lazy as _
from dcim.choices import *
from dcim.constants import *
from dcim.utils import get_module_bay_positions, resolve_module_placeholder
from utilities.forms import get_field_value
__all__ = (
@@ -70,18 +71,6 @@ class InterfaceCommonForm(forms.Form):
class ModuleCommonForm(forms.Form):
def _get_module_bay_tree(self, module_bay):
module_bays = []
while module_bay:
module_bays.append(module_bay)
if module_bay.module:
module_bay = module_bay.module.module_bay
else:
module_bay = None
module_bays.reverse()
return module_bays
def clean(self):
super().clean()
@@ -100,7 +89,7 @@ class ModuleCommonForm(forms.Form):
self.instance._disable_replication = True
return
module_bays = self._get_module_bay_tree(module_bay)
positions = get_module_bay_positions(module_bay)
for templates, component_attribute in [
("consoleporttemplates", "consoleports"),
@@ -119,25 +108,16 @@ class ModuleCommonForm(forms.Form):
# Get the templates for the module type.
for template in getattr(module_type, templates).all():
resolved_name = template.name
# Installing modules with placeholders require that the bay has a position value
if MODULE_TOKEN in template.name:
if not module_bay.position:
raise forms.ValidationError(
_("Cannot install module with placeholder values in a module bay with no position defined.")
)
if len(module_bays) != template.name.count(MODULE_TOKEN):
raise forms.ValidationError(
_(
"Cannot install module with placeholder values in a module bay tree {level} in tree "
"but {tokens} placeholders given."
).format(
level=len(module_bays), tokens=template.name.count(MODULE_TOKEN)
)
)
for module_bay in module_bays:
resolved_name = resolved_name.replace(MODULE_TOKEN, module_bay.position, 1)
try:
resolved_name = resolve_module_placeholder(template.name, positions)
except ValueError as e:
raise forms.ValidationError(str(e))
existing_item = installed_components.get(resolved_name)

View File

@@ -160,6 +160,7 @@ class Cable(PrimaryModel):
CableProfileChoices.TRUNK_4C6P: cable_profiles.Trunk4C6PCableProfile,
CableProfileChoices.TRUNK_4C8P: cable_profiles.Trunk4C8PCableProfile,
CableProfileChoices.TRUNK_8C4P: cable_profiles.Trunk8C4PCableProfile,
CableProfileChoices.BREAKOUT_1C2P_2C1P: cable_profiles.Breakout1C2Px2C1PCableProfile,
CableProfileChoices.BREAKOUT_1C4P_4C1P: cable_profiles.Breakout1C4Px4C1PCableProfile,
CableProfileChoices.BREAKOUT_1C6P_6C1P: cable_profiles.Breakout1C6Px6C1PCableProfile,
CableProfileChoices.BREAKOUT_2C4P_8C1P_SHUFFLE: cable_profiles.Breakout2C4Px8C1PShuffleCableProfile,

View File

@@ -9,6 +9,7 @@ from dcim.choices import *
from dcim.constants import *
from dcim.models.base import PortMappingBase
from dcim.models.mixins import InterfaceValidationMixin
from dcim.utils import get_module_bay_positions, resolve_module_placeholder
from netbox.models import ChangeLoggedModel
from utilities.fields import ColorField, NaturalOrderingField
from utilities.mptt import TreeManager
@@ -165,31 +166,15 @@ class ModularComponentTemplateModel(ComponentTemplateModel):
_("A component template must be associated with either a device type or a module type.")
)
def _get_module_tree(self, module):
modules = []
while module:
modules.append(module)
if module.module_bay:
module = module.module_bay.module
else:
module = None
modules.reverse()
return modules
def _resolve_module_placeholder(self, value, module):
if MODULE_TOKEN not in value or not module:
return value
modules = self._get_module_tree(module)
for m in modules:
value = value.replace(MODULE_TOKEN, m.module_bay.position, 1)
return value
def resolve_name(self, module):
return self._resolve_module_placeholder(self.name, module)
if MODULE_TOKEN not in self.name or not module:
return self.name
return resolve_module_placeholder(self.name, get_module_bay_positions(module.module_bay))
def resolve_label(self, module):
return self._resolve_module_placeholder(self.label, module)
if MODULE_TOKEN not in self.label or not module:
return self.label
return resolve_module_placeholder(self.label, get_module_bay_positions(module.module_bay))
class ConsolePortTemplate(ModularComponentTemplateModel):
@@ -720,7 +705,9 @@ class ModuleBayTemplate(ModularComponentTemplateModel):
verbose_name_plural = _('module bay templates')
def resolve_position(self, module):
return self._resolve_module_placeholder(self.position, module)
if MODULE_TOKEN not in self.position or not module:
return self.position
return resolve_module_placeholder(self.position, get_module_bay_positions(module.module_bay))
def instantiate(self, **kwargs):
return self.component_model(

View File

@@ -2,7 +2,7 @@ from functools import cached_property
from django.contrib.contenttypes.fields import GenericForeignKey, GenericRelation
from django.contrib.postgres.fields import ArrayField
from django.core.exceptions import ValidationError
from django.core.exceptions import ObjectDoesNotExist, ValidationError
from django.core.validators import MaxValueValidator, MinValueValidator
from django.db import models
from django.db.models import Sum
@@ -307,11 +307,12 @@ class PathEndpoint(models.Model):
`connected_endpoints()` is a convenience method for returning the destination of the associated CablePath, if any.
"""
_path = models.ForeignKey(
to='dcim.CablePath',
on_delete=models.SET_NULL,
null=True,
blank=True
blank=True,
)
class Meta:
@@ -323,11 +324,14 @@ class PathEndpoint(models.Model):
# Construct the complete path (including e.g. bridged interfaces)
while origin is not None:
if origin._path is None:
# Go through the public accessor rather than dereferencing `_path`
# directly. During cable edits, CablePath rows can be deleted and
# recreated while this endpoint instance is still in memory.
cable_path = origin.path
if cable_path is None:
break
path.extend(origin._path.path_objects)
path.extend(cable_path.path_objects)
# If the path ends at a non-connected pass-through port, pad out the link and far-end terminations
if len(path) % 3 == 1:
@@ -336,8 +340,8 @@ class PathEndpoint(models.Model):
elif len(path) % 3 == 2:
path.insert(-1, [])
# Check for a bridged relationship to continue the trace
destinations = origin._path.destinations
# Check for a bridged relationship to continue the trace.
destinations = cable_path.destinations
if len(destinations) == 1:
origin = getattr(destinations[0], 'bridge', None)
else:
@@ -348,14 +352,42 @@ class PathEndpoint(models.Model):
@property
def path(self):
return self._path
"""
Return this endpoint's current CablePath, if any.
`_path` is a denormalized reference that is updated from CablePath
save/delete handlers, including queryset.update() calls on origin
endpoints. That means an already-instantiated endpoint can briefly hold
a stale in-memory `_path` relation while the database already points to
a different CablePath (or to no path at all).
If the cached relation points to a CablePath that has just been
deleted, refresh only the `_path` field from the database and retry.
This keeps the fix cheap and narrowly scoped to the denormalized FK.
"""
if self._path_id is None:
return None
try:
return self._path
except ObjectDoesNotExist:
# Refresh only the denormalized FK instead of the whole model.
# The expected problem here is in-memory staleness during path
# rebuilds, not persistent database corruption.
self.refresh_from_db(fields=['_path'])
return self._path if self._path_id else None
@cached_property
def connected_endpoints(self):
"""
Caching accessor for the attached CablePath's destination (if any)
Caching accessor for the attached CablePath's destinations (if any).
Always route through `path` so stale in-memory `_path` references are
repaired before we cache the result for the lifetime of this instance.
"""
return self._path.destinations if self._path else []
if cable_path := self.path:
return cable_path.destinations
return []
#

View File

@@ -1149,7 +1149,7 @@ class VirtualDeviceContextTable(TenancyColumnsMixin, PrimaryModelTable):
)
device = tables.Column(
verbose_name=_('Device'),
order_by=('device___name',),
order_by=('device__name',),
linkify=True
)
status = columns.ChoiceFieldColumn(

View File

@@ -56,7 +56,9 @@ class ModuleTypeTable(PrimaryModelTable):
template_code=WEIGHT,
order_by=('_abs_weight', 'weight_unit')
)
attributes = columns.DictColumn()
attributes = columns.DictColumn(
orderable=False,
)
module_count = columns.LinkedCountColumn(
viewname='dcim:module_list',
url_params={'module_type_id': 'pk'},

View File

@@ -5,6 +5,7 @@ from circuits.models import *
from core.models import ObjectType
from dcim.choices import *
from dcim.models import *
from extras.events import serialize_for_event
from extras.models import CustomField
from ipam.models import Prefix
from netbox.choices import WeightUnitChoices
@@ -893,6 +894,77 @@ class ModuleBayTestCase(TestCase):
nested_bay = module.modulebays.get(name='Sub-bay 1-1')
self.assertEqual(nested_bay.position, '1-1')
@tag('regression') # #20474
def test_single_module_token_at_nested_depth(self):
"""
A module type with a single {module} token should install at depth > 1
without raising a token count mismatch error, resolving to the immediate
parent bay's position.
"""
manufacturer = Manufacturer.objects.first()
site = Site.objects.first()
device_role = DeviceRole.objects.first()
device_type = DeviceType.objects.create(
manufacturer=manufacturer,
model='Chassis with Rear Card',
slug='chassis-with-rear-card'
)
ModuleBayTemplate.objects.create(
device_type=device_type,
name='Rear card slot',
position='1'
)
rear_card_type = ModuleType.objects.create(
manufacturer=manufacturer,
model='Rear Card'
)
ModuleBayTemplate.objects.create(
module_type=rear_card_type,
name='SFP slot 1',
position='1'
)
ModuleBayTemplate.objects.create(
module_type=rear_card_type,
name='SFP slot 2',
position='2'
)
sfp_type = ModuleType.objects.create(
manufacturer=manufacturer,
model='SFP Module'
)
InterfaceTemplate.objects.create(
module_type=sfp_type,
name='SFP {module}',
type=InterfaceTypeChoices.TYPE_10GE_SFP_PLUS
)
device = Device.objects.create(
name='Test Chassis',
device_type=device_type,
role=device_role,
site=site
)
rear_card_bay = device.modulebays.get(name='Rear card slot')
rear_card = Module.objects.create(
device=device,
module_bay=rear_card_bay,
module_type=rear_card_type
)
sfp_bay = rear_card.modulebays.get(name='SFP slot 2')
sfp_module = Module.objects.create(
device=device,
module_bay=sfp_bay,
module_type=sfp_type
)
interface = sfp_module.interfaces.first()
self.assertEqual(interface.name, 'SFP 2')
@tag('regression') # #20912
def test_module_bay_parent_cleared_when_module_removed(self):
"""Test that the parent field is properly cleared when a module bay's module assignment is removed"""
@@ -1274,6 +1346,65 @@ class CableTestCase(TestCase):
self.assertEqual(a_terms, [interface1])
self.assertEqual(b_terms, [interface2])
@tag('regression') # #21498
def test_path_refreshes_replaced_cablepath_reference(self):
"""
An already-instantiated interface should refresh its denormalized
`_path` foreign key when the referenced CablePath row has been
replaced in the database.
"""
stale_interface = Interface.objects.get(device__name='TestDevice1', name='eth0')
old_path = CablePath.objects.get(pk=stale_interface._path_id)
new_path = CablePath(
path=old_path.path,
is_active=old_path.is_active,
is_complete=old_path.is_complete,
is_split=old_path.is_split,
)
old_path_id = old_path.pk
old_path.delete()
new_path.save()
# The old CablePath no longer exists
self.assertFalse(CablePath.objects.filter(pk=old_path_id).exists())
# The already-instantiated interface still points to the deleted path
# until the accessor refreshes `_path` from the database.
self.assertEqual(stale_interface._path_id, old_path_id)
self.assertEqual(stale_interface.path.pk, new_path.pk)
@tag('regression') # #21498
def test_serialize_for_event_handles_stale_cablepath_reference_after_retermination(self):
"""
Serializing an interface whose previously cached `_path` row has been
deleted during cable retermination must not raise.
"""
stale_interface = Interface.objects.get(device__name='TestDevice2', name='eth0')
old_path_id = stale_interface._path_id
new_peer = Interface.objects.get(device__name='TestDevice2', name='eth1')
cable = stale_interface.cable
self.assertIsNotNone(cable)
self.assertIsNotNone(old_path_id)
self.assertEqual(stale_interface.cable_end, 'B')
cable.b_terminations = [new_peer]
cable.save()
# The old CablePath was deleted during retrace.
self.assertFalse(CablePath.objects.filter(pk=old_path_id).exists())
# The stale in-memory instance still holds the deleted FK value.
self.assertEqual(stale_interface._path_id, old_path_id)
# Serialization must not raise ObjectDoesNotExist. Because this interface
# was the former B-side termination, it is now disconnected.
data = serialize_for_event(stale_interface)
self.assertIsNone(data['connected_endpoints'])
self.assertIsNone(data['connected_endpoints_type'])
self.assertFalse(data['connected_endpoints_reachable'])
class VirtualDeviceContextTestCase(TestCase):

View File

@@ -0,0 +1,204 @@
from dcim.models import ConsolePort, Interface, PowerPort
from dcim.tables import *
from utilities.testing import TableTestCases
#
# Sites
#
class RegionTableTest(TableTestCases.OrderableColumnsTestCase):
table = RegionTable
class SiteGroupTableTest(TableTestCases.OrderableColumnsTestCase):
table = SiteGroupTable
class SiteTableTest(TableTestCases.OrderableColumnsTestCase):
table = SiteTable
class LocationTableTest(TableTestCases.OrderableColumnsTestCase):
table = LocationTable
#
# Racks
#
class RackRoleTableTest(TableTestCases.OrderableColumnsTestCase):
table = RackRoleTable
class RackTypeTableTest(TableTestCases.OrderableColumnsTestCase):
table = RackTypeTable
class RackTableTest(TableTestCases.OrderableColumnsTestCase):
table = RackTable
class RackReservationTableTest(TableTestCases.OrderableColumnsTestCase):
table = RackReservationTable
#
# Device types
#
class ManufacturerTableTest(TableTestCases.OrderableColumnsTestCase):
table = ManufacturerTable
class DeviceTypeTableTest(TableTestCases.OrderableColumnsTestCase):
table = DeviceTypeTable
#
# Module types
#
class ModuleTypeProfileTableTest(TableTestCases.OrderableColumnsTestCase):
table = ModuleTypeProfileTable
class ModuleTypeTableTest(TableTestCases.OrderableColumnsTestCase):
table = ModuleTypeTable
class ModuleTableTest(TableTestCases.OrderableColumnsTestCase):
table = ModuleTable
#
# Devices
#
class DeviceRoleTableTest(TableTestCases.OrderableColumnsTestCase):
table = DeviceRoleTable
class PlatformTableTest(TableTestCases.OrderableColumnsTestCase):
table = PlatformTable
class DeviceTableTest(TableTestCases.OrderableColumnsTestCase):
table = DeviceTable
#
# Device components
#
class ConsolePortTableTest(TableTestCases.OrderableColumnsTestCase):
table = ConsolePortTable
class ConsoleServerPortTableTest(TableTestCases.OrderableColumnsTestCase):
table = ConsoleServerPortTable
class PowerPortTableTest(TableTestCases.OrderableColumnsTestCase):
table = PowerPortTable
class PowerOutletTableTest(TableTestCases.OrderableColumnsTestCase):
table = PowerOutletTable
class InterfaceTableTest(TableTestCases.OrderableColumnsTestCase):
table = InterfaceTable
class FrontPortTableTest(TableTestCases.OrderableColumnsTestCase):
table = FrontPortTable
class RearPortTableTest(TableTestCases.OrderableColumnsTestCase):
table = RearPortTable
class ModuleBayTableTest(TableTestCases.OrderableColumnsTestCase):
table = ModuleBayTable
class DeviceBayTableTest(TableTestCases.OrderableColumnsTestCase):
table = DeviceBayTable
class InventoryItemTableTest(TableTestCases.OrderableColumnsTestCase):
table = InventoryItemTable
class InventoryItemRoleTableTest(TableTestCases.OrderableColumnsTestCase):
table = InventoryItemRoleTable
#
# Connections
#
class ConsoleConnectionTableTest(TableTestCases.OrderableColumnsTestCase):
table = ConsoleConnectionTable
queryset_sources = [
('ConsoleConnectionsListView', ConsolePort.objects.filter(_path__is_complete=True)),
]
class PowerConnectionTableTest(TableTestCases.OrderableColumnsTestCase):
table = PowerConnectionTable
queryset_sources = [
('PowerConnectionsListView', PowerPort.objects.filter(_path__is_complete=True)),
]
class InterfaceConnectionTableTest(TableTestCases.OrderableColumnsTestCase):
table = InterfaceConnectionTable
queryset_sources = [
('InterfaceConnectionsListView', Interface.objects.filter(_path__is_complete=True)),
]
#
# Cables
#
class CableTableTest(TableTestCases.OrderableColumnsTestCase):
table = CableTable
#
# Power
#
class PowerPanelTableTest(TableTestCases.OrderableColumnsTestCase):
table = PowerPanelTable
class PowerFeedTableTest(TableTestCases.OrderableColumnsTestCase):
table = PowerFeedTable
#
# Virtual chassis
#
class VirtualChassisTableTest(TableTestCases.OrderableColumnsTestCase):
table = VirtualChassisTable
#
# Virtual device contexts
#
class VirtualDeviceContextTableTest(TableTestCases.OrderableColumnsTestCase):
table = VirtualDeviceContextTable
#
# MAC addresses
#
class MACAddressTableTest(TableTestCases.OrderableColumnsTestCase):
table = MACAddressTable

View File

@@ -3,6 +3,9 @@ from collections import defaultdict
from django.apps import apps
from django.contrib.contenttypes.models import ContentType
from django.db import router, transaction
from django.utils.translation import gettext as _
from dcim.constants import MODULE_TOKEN
def compile_path_node(ct_id, object_id):
@@ -33,6 +36,51 @@ def path_node_to_object(repr):
return ct.model_class().objects.filter(pk=object_id).first()
def get_module_bay_positions(module_bay):
"""
Given a module bay, traverse up the module hierarchy and return
a list of bay position strings from root to leaf.
"""
positions = []
while module_bay:
positions.append(module_bay.position)
if module_bay.module:
module_bay = module_bay.module.module_bay
else:
module_bay = None
positions.reverse()
return positions
def resolve_module_placeholder(value, positions):
"""
Resolve {module} placeholder tokens in a string using the given
list of module bay positions (ordered root to leaf).
A single {module} token resolves to the leaf (immediate parent) bay's position.
Multiple tokens must match the tree depth and resolve level-by-level.
Returns the resolved string.
Raises ValueError if token count is greater than 1 and doesn't match tree depth.
"""
if MODULE_TOKEN not in value:
return value
token_count = value.count(MODULE_TOKEN)
if token_count == 1:
return value.replace(MODULE_TOKEN, positions[-1])
if token_count == len(positions):
for pos in positions:
value = value.replace(MODULE_TOKEN, pos, 1)
return value
raise ValueError(
_("Cannot install module with placeholder values in a module bay tree "
"{level} levels deep but {tokens} placeholders given.").format(
level=len(positions), tokens=token_count
)
)
def create_cablepaths(objects):
"""
Create CablePaths for all paths originating from the specified set of nodes.

View File

@@ -1,19 +1,70 @@
from django.utils.translation import gettext as _
import logging
from django.core.files.storage import storages
from django.db import IntegrityError
from django.utils.translation import gettext_lazy as _
from drf_spectacular.utils import extend_schema_field
from rest_framework import serializers
from core.api.serializers_.jobs import JobSerializer
from extras.models import Script
from core.choices import ManagedFileRootPathChoices
from extras.models import Script, ScriptModule
from netbox.api.serializers import ValidatedModelSerializer
from utilities.datetime import local_now
logger = logging.getLogger(__name__)
__all__ = (
'ScriptDetailSerializer',
'ScriptInputSerializer',
'ScriptModuleSerializer',
'ScriptSerializer',
)
class ScriptModuleSerializer(ValidatedModelSerializer):
file = serializers.FileField(write_only=True)
file_path = serializers.CharField(read_only=True)
class Meta:
model = ScriptModule
fields = ['id', 'display', 'file_path', 'file', 'created', 'last_updated']
brief_fields = ('id', 'display')
def validate(self, data):
# ScriptModule.save() sets file_root; inject it here so full_clean() succeeds.
# Pop 'file' before model instantiation — ScriptModule has no such field.
file = data.pop('file', None)
data['file_root'] = ManagedFileRootPathChoices.SCRIPTS
data = super().validate(data)
data.pop('file_root', None)
if file is not None:
data['file'] = file
return data
def create(self, validated_data):
file = validated_data.pop('file')
storage = storages.create_storage(storages.backends["scripts"])
validated_data['file_path'] = storage.save(file.name, file)
created = False
try:
instance = super().create(validated_data)
created = True
return instance
except IntegrityError as e:
if 'file_path' in str(e):
raise serializers.ValidationError(
_("A script module with this file name already exists.")
)
raise
finally:
if not created and (file_path := validated_data.get('file_path')):
try:
storage.delete(file_path)
except Exception:
logger.warning(f"Failed to delete orphaned script file '{file_path}' from storage.")
class ScriptSerializer(ValidatedModelSerializer):
description = serializers.SerializerMethodField(read_only=True)
vars = serializers.SerializerMethodField(read_only=True)

View File

@@ -26,6 +26,7 @@ router.register('journal-entries', views.JournalEntryViewSet)
router.register('config-contexts', views.ConfigContextViewSet)
router.register('config-context-profiles', views.ConfigContextProfileViewSet)
router.register('config-templates', views.ConfigTemplateViewSet)
router.register('scripts/upload', views.ScriptModuleViewSet)
router.register('scripts', views.ScriptViewSet, basename='script')
app_name = 'extras-api'

View File

@@ -6,7 +6,7 @@ from rest_framework import status
from rest_framework.decorators import action
from rest_framework.exceptions import PermissionDenied
from rest_framework.generics import RetrieveUpdateDestroyAPIView
from rest_framework.mixins import ListModelMixin, RetrieveModelMixin
from rest_framework.mixins import CreateModelMixin, ListModelMixin, RetrieveModelMixin
from rest_framework.renderers import JSONRenderer
from rest_framework.response import Response
from rest_framework.routers import APIRootView
@@ -21,6 +21,7 @@ from netbox.api.features import SyncedDataMixin
from netbox.api.metadata import ContentTypeMetadata
from netbox.api.renderers import TextRenderer
from netbox.api.viewsets import BaseViewSet, NetBoxModelViewSet
from netbox.api.viewsets.mixins import ObjectValidationMixin
from utilities.exceptions import RQWorkerNotRunningException
from utilities.request import copy_safe_request
@@ -264,6 +265,11 @@ class ConfigTemplateViewSet(SyncedDataMixin, ConfigTemplateRenderMixin, NetBoxMo
# Scripts
#
class ScriptModuleViewSet(ObjectValidationMixin, CreateModelMixin, BaseViewSet):
queryset = ScriptModule.objects.all()
serializer_class = serializers.ScriptModuleSerializer
@extend_schema_view(
update=extend_schema(request=serializers.ScriptInputSerializer),
partial_update=extend_schema(request=serializers.ScriptInputSerializer),

View File

@@ -25,16 +25,54 @@ logger = logging.getLogger('netbox.events_processor')
class EventContext(UserDict):
"""
A custom dictionary that automatically serializes its associated object on demand.
Dictionary-compatible wrapper for queued events that lazily serializes
``event['data']`` on first access.
Backward-compatible with the plain-dict interface expected by existing
EVENTS_PIPELINE consumers. When the same object is enqueued more than once
in a single request, the serialization source is updated so consumers see
the latest state.
"""
# We're emulating a dictionary here (rather than using a custom class) because prior to NetBox v4.5.2, events were
# queued as dictionaries for processing by handles in EVENTS_PIPELINE. We need to avoid introducing any breaking
# changes until a suitable minor release.
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Track which model instance should be serialized if/when `data` is
# requested. This may be refreshed on duplicate enqueue, while leaving
# the public `object` entry untouched for compatibility.
self._serialization_source = None
if 'object' in self:
self._serialization_source = super().__getitem__('object')
def refresh_serialization_source(self, instance):
"""
Point lazy serialization at a fresher instance, invalidating any
already-materialized ``data``.
"""
self._serialization_source = instance
# UserDict.__contains__ checks the backing dict directly, so `in`
# does not trigger __getitem__'s lazy serialization.
if 'data' in self:
del self['data']
def freeze_data(self, instance):
"""
Eagerly serialize and cache the payload for delete events, where the
object may become inaccessible after deletion.
"""
super().__setitem__('data', serialize_for_event(instance))
self._serialization_source = None
def __getitem__(self, item):
if item == 'data' and 'data' not in self:
data = serialize_for_event(self['object'])
self.__setitem__('data', data)
# Materialize the payload only when an event consumer asks for it.
#
# On coalesced events, use the latest explicitly queued instance so
# webhooks/scripts/notifications observe the final queued state for
# that object within the request.
source = self._serialization_source or super().__getitem__('object')
super().__setitem__('data', serialize_for_event(source))
return super().__getitem__(item)
@@ -76,8 +114,9 @@ def get_snapshots(instance, event_type):
def enqueue_event(queue, instance, request, event_type):
"""
Enqueue a serialized representation of a created/updated/deleted object for the processing of
events once the request has completed.
Enqueue (or coalesce) an event for a created/updated/deleted object.
Events are processed after the request completes.
"""
# Bail if this type of object does not support event rules
if not has_feature(instance, 'event_rules'):
@@ -88,11 +127,18 @@ def enqueue_event(queue, instance, request, event_type):
assert instance.pk is not None
key = f'{app_label}.{model_name}:{instance.pk}'
if key in queue:
queue[key]['snapshots']['postchange'] = get_snapshots(instance, event_type)['postchange']
# If the object is being deleted, update any prior "update" event to "delete"
# If the object is being deleted, convert any prior update event into a
# delete event and freeze the payload before the object (or related
# rows) become inaccessible.
if event_type == OBJECT_DELETED:
queue[key]['event_type'] = event_type
else:
# Keep the public `object` entry stable for compatibility.
queue[key].refresh_serialization_source(instance)
else:
queue[key] = EventContext(
object_type=ObjectType.objects.get_for_model(instance),
@@ -106,9 +152,11 @@ def enqueue_event(queue, instance, request, event_type):
username=request.user.username, # DEPRECATED, will be removed in NetBox v4.7.0
request_id=request.id, # DEPRECATED, will be removed in NetBox v4.7.0
)
# Force serialization of objects prior to them actually being deleted
# For delete events, eagerly serialize the payload before the row is gone.
# This covers both first-time enqueues and coalesced update→delete promotions.
if event_type == OBJECT_DELETED:
queue[key]['data'] = serialize_for_event(instance)
queue[key].freeze_data(instance)
def process_event_rules(event_rules, object_type, event):
@@ -133,9 +181,9 @@ def process_event_rules(event_rules, object_type, event):
if not event_rule.eval_conditions(event['data']):
continue
# Compile event data
event_data = event_rule.action_data or {}
event_data.update(event['data'])
# Merge rule-specific action_data with the event payload.
# Copy to avoid mutating the rule's stored action_data dict.
event_data = {**(event_rule.action_data or {}), **event['data']}
# Webhooks
if event_rule.action_type == EventRuleActionChoices.WEBHOOK:

View File

@@ -417,6 +417,7 @@ class NotificationTable(NetBoxTable):
icon = columns.TemplateColumn(
template_code=NOTIFICATION_ICON,
accessor=tables.A('event'),
orderable=False,
attrs={
'td': {'class': 'w-1'},
'th': {'class': 'w-1'},
@@ -479,8 +480,8 @@ class WebhookTable(NetBoxTable):
verbose_name=_('Name'),
linkify=True
)
ssl_validation = columns.BooleanColumn(
verbose_name=_('SSL Validation')
ssl_verification = columns.BooleanColumn(
verbose_name=_('SSL Verification'),
)
owner = tables.Column(
linkify=True,

View File

@@ -1,7 +1,9 @@
import datetime
import hashlib
from unittest.mock import MagicMock, patch
from django.contrib.contenttypes.models import ContentType
from django.core.files.uploadedfile import SimpleUploadedFile
from django.urls import reverse
from django.utils.timezone import make_aware, now
from rest_framework import status
@@ -1384,3 +1386,54 @@ class NotificationTest(APIViewTestCases.APIViewTestCase):
'event_type': OBJECT_DELETED,
},
]
class ScriptModuleTest(APITestCase):
"""
Tests for the POST /api/extras/scripts/upload/ endpoint.
ScriptModule is a proxy of core.ManagedFile (a different app) so the standard
APIViewTestCases mixins cannot be used directly. All tests use add_permissions()
with explicit Django model-level permissions.
"""
def setUp(self):
super().setUp()
self.url = reverse('extras-api:scriptmodule-list') # /api/extras/scripts/upload/
def test_upload_script_module_without_permission(self):
script_content = b"from extras.scripts import Script\nclass TestScript(Script):\n pass\n"
upload_file = SimpleUploadedFile('test_upload.py', script_content, content_type='text/plain')
response = self.client.post(
self.url,
{'file': upload_file},
format='multipart',
**self.header,
)
self.assertHttpStatus(response, status.HTTP_403_FORBIDDEN)
def test_upload_script_module(self):
# ScriptModule is a proxy of core.ManagedFile; both permissions required.
self.add_permissions('extras.add_scriptmodule', 'core.add_managedfile')
script_content = b"from extras.scripts import Script\nclass TestScript(Script):\n pass\n"
upload_file = SimpleUploadedFile('test_upload.py', script_content, content_type='text/plain')
mock_storage = MagicMock()
mock_storage.save.return_value = 'test_upload.py'
with patch('extras.api.serializers_.scripts.storages') as mock_storages:
mock_storages.create_storage.return_value = mock_storage
mock_storages.backends = {'scripts': {}}
response = self.client.post(
self.url,
{'file': upload_file},
format='multipart',
**self.header,
)
self.assertHttpStatus(response, status.HTTP_201_CREATED)
self.assertEqual(response.data['file_path'], 'test_upload.py')
mock_storage.save.assert_called_once()
self.assertTrue(ScriptModule.objects.filter(file_path='test_upload.py').exists())
def test_upload_script_module_without_file_fails(self):
self.add_permissions('extras.add_scriptmodule', 'core.add_managedfile')
response = self.client.post(self.url, {}, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_400_BAD_REQUEST)

View File

@@ -1,8 +1,10 @@
import json
import uuid
from unittest import skipIf
from unittest.mock import Mock, patch
import django_rq
from django.conf import settings
from django.http import HttpResponse
from django.test import RequestFactory
from django.urls import reverse
@@ -343,6 +345,7 @@ class EventRuleTest(APITestCase):
self.assertEqual(job.kwargs['snapshots']['prechange']['name'], sites[i].name)
self.assertEqual(job.kwargs['snapshots']['prechange']['tags'], ['Bar', 'Foo'])
@skipIf('netbox.tests.dummy_plugin' not in settings.PLUGINS, 'dummy_plugin not in settings.PLUGINS')
def test_send_webhook(self):
request_id = uuid.uuid4()
@@ -426,6 +429,97 @@ class EventRuleTest(APITestCase):
self.assertEqual(job.kwargs['object_type'], script_type)
self.assertEqual(job.kwargs['username'], self.user.username)
def test_duplicate_enqueue_refreshes_lazy_payload(self):
"""
When the same object is enqueued more than once in a single request,
lazy serialization should use the most recently enqueued instance while
preserving the original event['object'] reference.
"""
request = RequestFactory().get(reverse('dcim:site_add'))
request.id = uuid.uuid4()
request.user = self.user
site = Site.objects.create(name='Site 1', slug='site-1')
stale_site = Site.objects.get(pk=site.pk)
queue = {}
enqueue_event(queue, stale_site, request, OBJECT_UPDATED)
event = queue[f'dcim.site:{site.pk}']
# Data should not be materialized yet (lazy serialization)
self.assertNotIn('data', event.data)
fresh_site = Site.objects.get(pk=site.pk)
fresh_site.description = 'foo'
fresh_site.save()
enqueue_event(queue, fresh_site, request, OBJECT_UPDATED)
# The original object reference should be preserved
self.assertIs(event['object'], stale_site)
# But serialized data should reflect the fresher instance
self.assertEqual(event['data']['description'], 'foo')
self.assertEqual(event['snapshots']['postchange']['description'], 'foo')
def test_duplicate_enqueue_invalidates_materialized_data(self):
"""
If event['data'] has already been materialized before a second enqueue
for the same object, the stale payload should be discarded and rebuilt
from the fresher instance on next access.
"""
request = RequestFactory().get(reverse('dcim:site_add'))
request.id = uuid.uuid4()
request.user = self.user
site = Site.objects.create(name='Site 1', slug='site-1')
queue = {}
enqueue_event(queue, site, request, OBJECT_UPDATED)
event = queue[f'dcim.site:{site.pk}']
# Force early materialization
self.assertEqual(event['data']['description'], '')
# Now update and re-enqueue
fresh_site = Site.objects.get(pk=site.pk)
fresh_site.description = 'updated'
fresh_site.save()
enqueue_event(queue, fresh_site, request, OBJECT_UPDATED)
# Stale data should have been invalidated; new access should reflect update
self.assertEqual(event['data']['description'], 'updated')
def test_update_then_delete_enqueue_freezes_payload(self):
"""
When an update event is coalesced with a subsequent delete, the event
type should be promoted to OBJECT_DELETED and the payload should be
eagerly frozen (since the object will be inaccessible after deletion).
"""
request = RequestFactory().get(reverse('dcim:site_add'))
request.id = uuid.uuid4()
request.user = self.user
site = Site.objects.create(name='Site 1', slug='site-1')
queue = {}
enqueue_event(queue, site, request, OBJECT_UPDATED)
event = queue[f'dcim.site:{site.pk}']
enqueue_event(queue, site, request, OBJECT_DELETED)
# Event type should have been promoted
self.assertEqual(event['event_type'], OBJECT_DELETED)
# Data should already be materialized (frozen), not lazy
self.assertIn('data', event.data)
self.assertEqual(event['data']['name'], 'Site 1')
self.assertIsNone(event['snapshots']['postchange'])
def test_duplicate_triggers(self):
"""
Test for erroneous duplicate event triggers resulting from saving an object multiple times

View File

@@ -1,24 +1,84 @@
from django.test import RequestFactory, TestCase, tag
from extras.models import EventRule
from extras.tables import EventRuleTable
from extras.models import Bookmark, Notification, Subscription
from extras.tables import *
from utilities.testing import TableTestCases
@tag('regression')
class EventRuleTableTest(TestCase):
def test_every_orderable_field_does_not_throw_exception(self):
rule = EventRule.objects.all()
disallowed = {
'actions',
}
class CustomFieldTableTest(TableTestCases.OrderableColumnsTestCase):
table = CustomFieldTable
orderable_columns = [
column.name for column in EventRuleTable(rule).columns if column.orderable and column.name not in disallowed
]
fake_request = RequestFactory().get('/')
for col in orderable_columns:
for direction in ('-', ''):
table = EventRuleTable(rule)
table.order_by = f'{direction}{col}'
table.as_html(fake_request)
class CustomFieldChoiceSetTableTest(TableTestCases.OrderableColumnsTestCase):
table = CustomFieldChoiceSetTable
class CustomLinkTableTest(TableTestCases.OrderableColumnsTestCase):
table = CustomLinkTable
class ExportTemplateTableTest(TableTestCases.OrderableColumnsTestCase):
table = ExportTemplateTable
class SavedFilterTableTest(TableTestCases.OrderableColumnsTestCase):
table = SavedFilterTable
class TableConfigTableTest(TableTestCases.OrderableColumnsTestCase):
table = TableConfigTable
class BookmarkTableTest(TableTestCases.OrderableColumnsTestCase):
table = BookmarkTable
queryset_sources = [
('BookmarkListView', Bookmark.objects.all()),
]
class NotificationGroupTableTest(TableTestCases.OrderableColumnsTestCase):
table = NotificationGroupTable
class NotificationTableTest(TableTestCases.OrderableColumnsTestCase):
table = NotificationTable
queryset_sources = [
('NotificationListView', Notification.objects.all()),
]
class SubscriptionTableTest(TableTestCases.OrderableColumnsTestCase):
table = SubscriptionTable
queryset_sources = [
('SubscriptionListView', Subscription.objects.all()),
]
class WebhookTableTest(TableTestCases.OrderableColumnsTestCase):
table = WebhookTable
class EventRuleTableTest(TableTestCases.OrderableColumnsTestCase):
table = EventRuleTable
class TagTableTest(TableTestCases.OrderableColumnsTestCase):
table = TagTable
class ConfigContextProfileTableTest(TableTestCases.OrderableColumnsTestCase):
table = ConfigContextProfileTable
class ConfigContextTableTest(TableTestCases.OrderableColumnsTestCase):
table = ConfigContextTable
class ConfigTemplateTableTest(TableTestCases.OrderableColumnsTestCase):
table = ConfigTemplateTable
class ImageAttachmentTableTest(TableTestCases.OrderableColumnsTestCase):
table = ImageAttachmentTable
class JournalEntryTableTest(TableTestCases.OrderableColumnsTestCase):
table = JournalEntryTable

View File

@@ -247,6 +247,6 @@ class VLANTranslationRuleTable(NetBoxTable):
class Meta(NetBoxTable.Meta):
model = VLANTranslationRule
fields = (
'pk', 'id', 'name', 'policy', 'local_vid', 'remote_vid', 'description', 'tags', 'created', 'last_updated',
'pk', 'id', 'policy', 'local_vid', 'remote_vid', 'description', 'tags', 'created', 'last_updated',
)
default_columns = ('pk', 'policy', 'local_vid', 'remote_vid', 'description')

View File

@@ -1,9 +1,10 @@
from django.test import RequestFactory, TestCase
from netaddr import IPNetwork
from ipam.models import IPAddress, IPRange, Prefix
from ipam.tables import AnnotatedIPAddressTable
from ipam.models import FHRPGroupAssignment, IPAddress, IPRange, Prefix
from ipam.tables import *
from ipam.utils import annotate_ip_space
from utilities.testing import TableTestCases
class AnnotatedIPAddressTableTest(TestCase):
@@ -168,3 +169,82 @@ class AnnotatedIPAddressTableTest(TestCase):
# Pools are fully usable
self.assertEqual(available.first_ip, '2001:db8:1::/126')
self.assertEqual(available.size, 4)
#
# Table ordering tests
#
class VRFTableTest(TableTestCases.OrderableColumnsTestCase):
table = VRFTable
class RouteTargetTableTest(TableTestCases.OrderableColumnsTestCase):
table = RouteTargetTable
class RIRTableTest(TableTestCases.OrderableColumnsTestCase):
table = RIRTable
class AggregateTableTest(TableTestCases.OrderableColumnsTestCase):
table = AggregateTable
class RoleTableTest(TableTestCases.OrderableColumnsTestCase):
table = RoleTable
class PrefixTableTest(TableTestCases.OrderableColumnsTestCase):
table = PrefixTable
class IPRangeTableTest(TableTestCases.OrderableColumnsTestCase):
table = IPRangeTable
class IPAddressTableTest(TableTestCases.OrderableColumnsTestCase):
table = IPAddressTable
class FHRPGroupTableTest(TableTestCases.OrderableColumnsTestCase):
table = FHRPGroupTable
class FHRPGroupAssignmentTableTest(TableTestCases.OrderableColumnsTestCase):
table = FHRPGroupAssignmentTable
queryset_sources = [
('FHRPGroupAssignmentTable', FHRPGroupAssignment.objects.all()),
]
class VLANGroupTableTest(TableTestCases.OrderableColumnsTestCase):
table = VLANGroupTable
class VLANTableTest(TableTestCases.OrderableColumnsTestCase):
table = VLANTable
class VLANTranslationPolicyTableTest(TableTestCases.OrderableColumnsTestCase):
table = VLANTranslationPolicyTable
class VLANTranslationRuleTableTest(TableTestCases.OrderableColumnsTestCase):
table = VLANTranslationRuleTable
class ASNRangeTableTest(TableTestCases.OrderableColumnsTestCase):
table = ASNRangeTable
class ASNTableTest(TableTestCases.OrderableColumnsTestCase):
table = ASNTable
class ServiceTemplateTableTest(TableTestCases.OrderableColumnsTestCase):
table = ServiceTemplateTable
class ServiceTableTest(TableTestCases.OrderableColumnsTestCase):
table = ServiceTable

View File

@@ -20,6 +20,10 @@ PLUGINS = [
'netbox.tests.dummy_plugin',
]
RQ = {
'COMMIT_MODE': 'auto',
}
REDIS = {
'tasks': {
'HOST': 'localhost',

View File

@@ -168,6 +168,7 @@ REMOTE_AUTH_USER_FIRST_NAME = getattr(configuration, 'REMOTE_AUTH_USER_FIRST_NAM
REMOTE_AUTH_USER_LAST_NAME = getattr(configuration, 'REMOTE_AUTH_USER_LAST_NAME', 'HTTP_REMOTE_USER_LAST_NAME')
# Required by extras/migrations/0109_script_models.py
REPORTS_ROOT = getattr(configuration, 'REPORTS_ROOT', os.path.join(BASE_DIR, 'reports')).rstrip('/')
RQ = getattr(configuration, 'RQ', {})
RQ_DEFAULT_TIMEOUT = getattr(configuration, 'RQ_DEFAULT_TIMEOUT', 300)
RQ_RETRY_INTERVAL = getattr(configuration, 'RQ_RETRY_INTERVAL', 60)
RQ_RETRY_MAX = getattr(configuration, 'RQ_RETRY_MAX', 0)

View File

@@ -12,7 +12,7 @@
<th scope="row"><i class="mdi mdi-chip"></i> {% trans "Memory" %}</th>
<td>
{% if memory_sum %}
<span title={{ memory_sum }}>{{ memory_sum|humanize_ram_megabytes }}</span>
<span title={{ memory_sum }}>{{ memory_sum|humanize_ram_capacity }}</span>
{% else %}
{{ ''|placeholder }}
{% endif %}
@@ -24,7 +24,7 @@
</th>
<td>
{% if disk_sum %}
{{ disk_sum|humanize_disk_megabytes }}
{{ disk_sum|humanize_disk_capacity }}
{% else %}
{{ ''|placeholder }}
{% endif %}

View File

@@ -12,7 +12,7 @@
<th scope="row"><i class="mdi mdi-chip"></i> {% trans "Memory" %}</th>
<td>
{% if object.memory %}
<span title={{ object.memory }}>{{ object.memory|humanize_ram_megabytes }}</span>
<span title={{ object.memory }}>{{ object.memory|humanize_ram_capacity }}</span>
{% else %}
{{ ''|placeholder }}
{% endif %}
@@ -24,7 +24,7 @@
</th>
<td>
{% if object.disk %}
{{ object.disk|humanize_disk_megabytes }}
{{ object.disk|humanize_disk_capacity }}
{% else %}
{{ ''|placeholder }}
{% endif %}

View File

@@ -1,2 +1,2 @@
{% load helpers %}
{{ value|humanize_disk_megabytes }}
{{ value|humanize_disk_capacity }}

View File

@@ -0,0 +1,26 @@
from tenancy.tables import *
from utilities.testing import TableTestCases
class TenantGroupTableTest(TableTestCases.OrderableColumnsTestCase):
table = TenantGroupTable
class TenantTableTest(TableTestCases.OrderableColumnsTestCase):
table = TenantTable
class ContactGroupTableTest(TableTestCases.OrderableColumnsTestCase):
table = ContactGroupTable
class ContactRoleTableTest(TableTestCases.OrderableColumnsTestCase):
table = ContactRoleTable
class ContactTableTest(TableTestCases.OrderableColumnsTestCase):
table = ContactTable
class ContactAssignmentTableTest(TableTestCases.OrderableColumnsTestCase):
table = ContactAssignmentTable

File diff suppressed because it is too large Load Diff

View File

@@ -1,24 +1,26 @@
from django.test import RequestFactory, TestCase, tag
from users.models import Token
from users.tables import TokenTable
from users.tables import *
from utilities.testing import TableTestCases
class TokenTableTest(TestCase):
@tag('regression')
def test_every_orderable_field_does_not_throw_exception(self):
tokens = Token.objects.all()
disallowed = {'actions'}
class TokenTableTest(TableTestCases.OrderableColumnsTestCase):
table = TokenTable
orderable_columns = [
column.name for column in TokenTable(tokens).columns
if column.orderable and column.name not in disallowed
]
fake_request = RequestFactory().get("/")
for col in orderable_columns:
for direction in ('-', ''):
with self.subTest(col=col, direction=direction):
table = TokenTable(tokens)
table.order_by = f'{direction}{col}'
table.as_html(fake_request)
class UserTableTest(TableTestCases.OrderableColumnsTestCase):
table = UserTable
class GroupTableTest(TableTestCases.OrderableColumnsTestCase):
table = GroupTable
class ObjectPermissionTableTest(TableTestCases.OrderableColumnsTestCase):
table = ObjectPermissionTable
class OwnerGroupTableTest(TableTestCases.OrderableColumnsTestCase):
table = OwnerGroupTable
class OwnerTableTest(TableTestCases.OrderableColumnsTestCase):
table = OwnerTable

View File

@@ -14,6 +14,7 @@ __all__ = (
'expand_alphanumeric_pattern',
'expand_ipaddress_pattern',
'form_from_model',
'get_capacity_unit_label',
'get_field_value',
'get_selected_values',
'parse_alphanumeric_range',
@@ -130,6 +131,13 @@ def expand_ipaddress_pattern(string, family):
yield ''.join([lead, format(i, 'x' if family == 6 else 'd'), remnant])
def get_capacity_unit_label(divisor=1000):
"""
Return the appropriate base unit label: 'MiB' for binary (1024), 'MB' for decimal (1000).
"""
return 'MiB' if divisor == 1024 else 'MB'
def get_field_value(form, field_name):
"""
Return the current bound or initial value associated with a form field, prior to calling

View File

@@ -48,11 +48,13 @@ class FilterModifierWidget(forms.Widget):
Just the value string for form validation. The modifier is reconstructed
during rendering from the query parameter names.
"""
# Special handling for empty - check if field__empty exists
# Special handling for empty modifier: return None so the underlying field does not
# attempt to validate 'true'/'false' as a field value (e.g. a model PK). The
# `__empty` query parameter is consumed directly by the filterset and by
# `applied_filters`, so no value from the field itself is needed here.
empty_param = f"{name}__empty"
if empty_param in data:
# Return the boolean value for empty lookup
return data.get(empty_param)
return None
# Try exact field name first
value = self.original_widget.value_from_datadict(data, files, name)
@@ -113,8 +115,13 @@ class FilterModifierWidget(forms.Widget):
# Build a minimal choice list with just the selected values
choices = []
if pk_values:
selected_objects = original_choices.queryset.filter(pk__in=pk_values)
choices = [(obj.pk, str(obj)) for obj in selected_objects]
try:
selected_objects = original_choices.queryset.filter(pk__in=pk_values)
choices = [(obj.pk, str(obj)) for obj in selected_objects]
except (ValueError, TypeError):
# pk_values may contain non-PK strings (e.g. 'true'/'false' from the
# empty modifier); silently skip rendering selected choices in that case.
pass
# Re-add the "None" option if it was selected via the null choice value
if settings.FILTERS_NULL_CHOICE_VALUE in values:

View File

@@ -20,8 +20,8 @@ __all__ = (
'divide',
'get_item',
'get_key',
'humanize_disk_megabytes',
'humanize_ram_megabytes',
'humanize_disk_capacity',
'humanize_ram_capacity',
'humanize_speed',
'icon_from_status',
'kg_to_pounds',
@@ -208,42 +208,52 @@ def humanize_speed(speed):
return '{} Kbps'.format(speed)
def _humanize_megabytes(mb, divisor=1000):
def _humanize_capacity(value, divisor=1000):
"""
Express a number of megabytes in the most suitable unit (e.g. gigabytes, terabytes, etc.).
Express a capacity value in the most suitable unit (e.g. GB, TiB, etc.).
The value is treated as a unitless base-unit quantity; the divisor determines
both the scaling thresholds and the label convention:
- 1000: SI labels (MB, GB, TB, PB)
- 1024: IEC labels (MiB, GiB, TiB, PiB)
"""
if not mb:
if not value:
return ""
if divisor == 1024:
labels = ('MiB', 'GiB', 'TiB', 'PiB')
else:
labels = ('MB', 'GB', 'TB', 'PB')
PB_SIZE = divisor**3
TB_SIZE = divisor**2
GB_SIZE = divisor
if mb >= PB_SIZE:
return f"{mb / PB_SIZE:.2f} PB"
if mb >= TB_SIZE:
return f"{mb / TB_SIZE:.2f} TB"
if mb >= GB_SIZE:
return f"{mb / GB_SIZE:.2f} GB"
return f"{mb} MB"
if value >= PB_SIZE:
return f"{value / PB_SIZE:.2f} {labels[3]}"
if value >= TB_SIZE:
return f"{value / TB_SIZE:.2f} {labels[2]}"
if value >= GB_SIZE:
return f"{value / GB_SIZE:.2f} {labels[1]}"
return f"{value} {labels[0]}"
@register.filter()
def humanize_disk_megabytes(mb):
def humanize_disk_capacity(value):
"""
Express a number of megabytes in the most suitable unit (e.g. gigabytes, terabytes, etc.).
Use the DISK_BASE_UNIT setting to determine the divisor. Default is 1000.
Express a disk capacity in the most suitable unit, using the DISK_BASE_UNIT
setting to select SI (MB/GB) or IEC (MiB/GiB) labels.
"""
return _humanize_megabytes(mb, DISK_BASE_UNIT)
return _humanize_capacity(value, DISK_BASE_UNIT)
@register.filter()
def humanize_ram_megabytes(mb):
def humanize_ram_capacity(value):
"""
Express a number of megabytes in the most suitable unit (e.g. gigabytes, terabytes, etc.).
Use the RAM_BASE_UNIT setting to determine the divisor. Default is 1000.
Express a RAM capacity in the most suitable unit, using the RAM_BASE_UNIT
setting to select SI (MB/GB) or IEC (MiB/GiB) labels.
"""
return _humanize_megabytes(mb, RAM_BASE_UNIT)
return _humanize_capacity(value, RAM_BASE_UNIT)
@register.filter()
@@ -481,6 +491,35 @@ def applied_filters(context, model, form, query_params):
'link_text': link_text,
})
# Handle empty modifier pills separately. `FilterModifierWidget.value_from_datadict()`
# returns None for fields with a `field__empty` query parameter so that the underlying
# form field does not attempt to validate 'true'/'false' as a real field value (which
# would raise a ValidationError for ModelChoiceField). Because the value is None, these
# fields never appear in `form.changed_data`, so we build their pills directly from the
# query parameters here.
for param_name, param_value in query_params.items():
if not param_name.endswith('__empty'):
continue
field_name = param_name[:-len('__empty')]
if field_name not in form.fields or field_name == 'filter_id':
continue
querydict = query_params.copy()
querydict.pop(param_name)
label = form.fields[field_name].label or field_name
if param_value.lower() in ('true', '1'):
link_text = f'{label} {_("is empty")}'
else:
link_text = f'{label} {_("is not empty")}'
applied_filters.append({
'name': param_name,
'value': param_value,
'link_url': f'?{querydict.urlencode()}',
'link_text': link_text,
})
save_link = None
if user.has_perm('extras.add_savedfilter') and 'filter_id' not in context['request'].GET:
object_type = ObjectType.objects.get_for_model(model).pk

View File

@@ -1,5 +1,6 @@
from .api import *
from .base import *
from .filtersets import *
from .tables import *
from .utils import *
from .views import *

View File

@@ -0,0 +1,130 @@
import inspect
from importlib import import_module
from django.test import RequestFactory
from netbox.views import generic
from .base import TestCase
__all__ = (
"ModelTableTestCase",
"TableTestCases",
)
class ModelTableTestCase(TestCase):
"""
Shared helpers for model-backed table tests.
Concrete subclasses should set `table` and may override `get_queryset()`
or `excluded_orderable_columns` as needed.
"""
table = None
excluded_orderable_columns = frozenset({"actions"})
# Optional explicit override for odd cases
queryset_sources = None
# Only these view types are considered sortable queryset sources by default
queryset_source_view_classes = (generic.ObjectListView,)
@classmethod
def validate_table_test_case(cls):
if cls.table is None:
raise AssertionError(f"{cls.__name__} must define `table`")
if getattr(cls.table._meta, "model", None) is None:
raise AssertionError(f"{cls.__name__}.table must be model-backed")
def get_request(self):
request = RequestFactory().get("/")
request.user = self.user
return request
def get_table(self, queryset):
return self.table(queryset)
@classmethod
def is_queryset_source_view(cls, view):
model = cls.table._meta.model
app_label = model._meta.app_label
return (
inspect.isclass(view)
and view.__module__.startswith(f"{app_label}.views")
and getattr(view, "table", None) is cls.table
and getattr(view, "queryset", None) is not None
and issubclass(view, cls.queryset_source_view_classes)
)
@classmethod
def get_queryset_sources(cls):
"""
Return iterable of (label, queryset) pairs to test.
By default, only discover list-style views that declare this table.
That keeps bulk edit/delete confirmation tables out of the ordering
smoke test.
"""
if cls.queryset_sources is not None:
return tuple(cls.queryset_sources)
model = cls.table._meta.model
app_label = model._meta.app_label
module = import_module(f"{app_label}.views")
sources = []
for _, view in inspect.getmembers(module, inspect.isclass):
if not cls.is_queryset_source_view(view):
continue
queryset = view.queryset
if hasattr(queryset, "all"):
queryset = queryset.all()
sources.append((view.__name__, queryset))
if not sources:
raise AssertionError(
f"{cls.__name__} could not find any list-style queryset source for "
f"{cls.table.__module__}.{cls.table.__name__}; "
"set `queryset_sources` explicitly if needed."
)
return tuple(sources)
def iter_orderable_columns(self, queryset):
for column in self.get_table(queryset).columns:
if not column.orderable:
continue
if column.name in self.excluded_orderable_columns:
continue
yield column.name
class TableTestCases:
"""
Keep test_* methods nested to avoid unittest auto-discovering the reusable
base classes directly.
"""
class OrderableColumnsTestCase(ModelTableTestCase):
@classmethod
def setUpClass(cls):
super().setUpClass()
cls.validate_table_test_case()
def test_every_orderable_column_renders(self):
request = self.get_request()
for source_name, queryset in self.get_queryset_sources():
for column_name in self.iter_orderable_columns(queryset):
for direction, prefix in (("asc", ""), ("desc", "-")):
with self.cleanupSubTest(
source=source_name,
column=column_name,
direction=direction,
):
table = self.get_table(queryset)
table.order_by = f"{prefix}{column_name}"
table.as_html(request)

View File

@@ -6,8 +6,11 @@ from django.template import Context
from django.test import RequestFactory, TestCase
import dcim.filtersets # noqa: F401 - Import to register Device filterset
from dcim.forms.filtersets import DeviceFilterForm
from dcim.models import Device
from core.models import ObjectType
from dcim.forms.filtersets import DeviceFilterForm, SiteFilterForm
from dcim.models import Device, Manufacturer, Site
from extras.choices import CustomFieldTypeChoices
from extras.models import CustomField
from netbox.filtersets import BaseFilterSet
from tenancy.models import Tenant
from users.models import User
@@ -338,3 +341,70 @@ class EmptyLookupTest(TestCase):
self.assertGreater(len(result['applied_filters']), 0)
filter_pill = result['applied_filters'][0]
self.assertIn('not empty', filter_pill['link_text'].lower())
class ObjectCustomFieldEmptyLookupTest(TestCase):
"""
Regression test for https://github.com/netbox-community/netbox/issues/21535.
Rendering a filter form with an object-type custom field and the __empty modifier
must not raise a ValueError or produce a form validation error.
Filter pills must still appear for the empty modifier.
"""
@classmethod
def setUpTestData(cls):
cls.user = User.objects.create(username='test_user_obj_cf')
site_type = ObjectType.objects.get_for_model(Site)
cf = CustomField(
name='test_obj_cf',
type=CustomFieldTypeChoices.TYPE_OBJECT,
related_object_type=ObjectType.objects.get_for_model(Manufacturer),
)
cf.save()
cf.object_types.set([site_type])
def _make_form_and_result(self, querystring):
query_params = QueryDict(querystring)
form = SiteFilterForm(query_params)
request = RequestFactory().get('/', query_params)
request.user = self.user
context = Context({'request': request})
result = applied_filters(context, Site, form, query_params)
return form, result
def test_render_form_with_empty_true_no_error(self):
"""Rendering SiteFilterForm with cf__empty=true must not raise ValueError."""
query_params = QueryDict('cf_test_obj_cf__empty=true')
form = SiteFilterForm(query_params)
try:
str(form['cf_test_obj_cf'])
except ValueError as e:
self.fail(f"Rendering object-type custom field with __empty=true raised ValueError: {e}")
def test_render_form_with_empty_false_no_error(self):
"""Rendering SiteFilterForm with cf__empty=false must not raise ValueError."""
query_params = QueryDict('cf_test_obj_cf__empty=false')
form = SiteFilterForm(query_params)
try:
str(form['cf_test_obj_cf'])
except ValueError as e:
self.fail(f"Rendering object-type custom field with __empty=false raised ValueError: {e}")
def test_no_validation_error_on_empty_true(self):
"""The filter form must not have a validation error for the field when __empty=true."""
form, _ = self._make_form_and_result('cf_test_obj_cf__empty=true')
form.is_valid()
self.assertNotIn('cf_test_obj_cf', form.errors)
def test_filter_pill_appears_for_empty_true(self):
"""A filter pill showing 'is empty' must be generated for an object-type CF with __empty=true."""
_, result = self._make_form_and_result('cf_test_obj_cf__empty=true')
self.assertGreater(len(result['applied_filters']), 0)
self.assertIn('empty', result['applied_filters'][0]['link_text'].lower())
def test_filter_pill_appears_for_empty_false(self):
"""A filter pill showing 'is not empty' must be generated for an object-type CF with __empty=false."""
_, result = self._make_form_and_result('cf_test_obj_cf__empty=false')
self.assertGreater(len(result['applied_filters']), 0)
self.assertIn('not empty', result['applied_filters'][0]['link_text'].lower())

View File

@@ -6,7 +6,12 @@ from netbox.choices import ImportFormatChoices
from utilities.forms.bulk_import import BulkImportForm
from utilities.forms.fields.csv import CSVSelectWidget
from utilities.forms.forms import BulkRenameForm
from utilities.forms.utils import expand_alphanumeric_pattern, expand_ipaddress_pattern, get_field_value
from utilities.forms.utils import (
expand_alphanumeric_pattern,
expand_ipaddress_pattern,
get_capacity_unit_label,
get_field_value,
)
from utilities.forms.widgets.select import AvailableOptions, SelectedOptions
@@ -550,3 +555,15 @@ class SelectMultipleWidgetTest(TestCase):
self.assertEqual(widget.choices[0][1], [(2, 'Option 2')])
self.assertEqual(widget.choices[1][0], 'Group B')
self.assertEqual(widget.choices[1][1], [(3, 'Option 3')])
class GetCapacityUnitLabelTest(TestCase):
"""
Test the get_capacity_unit_label function for correct base unit label.
"""
def test_si_label(self):
self.assertEqual(get_capacity_unit_label(1000), 'MB')
def test_iec_label(self):
self.assertEqual(get_capacity_unit_label(1024), 'MiB')

View File

@@ -3,6 +3,7 @@ from unittest.mock import patch
from django.test import TestCase, override_settings
from utilities.templatetags.builtins.tags import static_with_params
from utilities.templatetags.helpers import _humanize_capacity
class StaticWithParamsTest(TestCase):
@@ -46,3 +47,46 @@ class StaticWithParamsTest(TestCase):
# Check that new parameter value is used
self.assertIn('v=new_version', result)
self.assertNotIn('v=old_version', result)
class HumanizeCapacityTest(TestCase):
"""
Test the _humanize_capacity function for correct SI/IEC unit label selection.
"""
# Tests with divisor=1000 (SI/decimal units)
def test_si_megabytes(self):
self.assertEqual(_humanize_capacity(500, divisor=1000), '500 MB')
def test_si_gigabytes(self):
self.assertEqual(_humanize_capacity(2000, divisor=1000), '2.00 GB')
def test_si_terabytes(self):
self.assertEqual(_humanize_capacity(2000000, divisor=1000), '2.00 TB')
def test_si_petabytes(self):
self.assertEqual(_humanize_capacity(2000000000, divisor=1000), '2.00 PB')
# Tests with divisor=1024 (IEC/binary units)
def test_iec_megabytes(self):
self.assertEqual(_humanize_capacity(500, divisor=1024), '500 MiB')
def test_iec_gigabytes(self):
self.assertEqual(_humanize_capacity(2048, divisor=1024), '2.00 GiB')
def test_iec_terabytes(self):
self.assertEqual(_humanize_capacity(2097152, divisor=1024), '2.00 TiB')
def test_iec_petabytes(self):
self.assertEqual(_humanize_capacity(2147483648, divisor=1024), '2.00 PiB')
# Edge cases
def test_empty_value(self):
self.assertEqual(_humanize_capacity(0, divisor=1000), '')
self.assertEqual(_humanize_capacity(None, divisor=1000), '')
def test_default_divisor_is_1000(self):
self.assertEqual(_humanize_capacity(2000), '2.00 GB')

View File

@@ -1,4 +1,5 @@
from django import forms
from django.conf import settings
from django.utils.translation import gettext_lazy as _
from dcim.choices import InterfaceModeChoices
@@ -13,6 +14,7 @@ from tenancy.models import Tenant
from utilities.forms import BulkRenameForm, add_blank_choice
from utilities.forms.fields import DynamicModelChoiceField, DynamicModelMultipleChoiceField
from utilities.forms.rendering import FieldSet
from utilities.forms.utils import get_capacity_unit_label
from utilities.forms.widgets import BulkEditNullBooleanSelect
from virtualization.choices import *
from virtualization.models import *
@@ -138,11 +140,11 @@ class VirtualMachineBulkEditForm(PrimaryModelBulkEditForm):
)
memory = forms.IntegerField(
required=False,
label=_('Memory (MB)')
label=_('Memory')
)
disk = forms.IntegerField(
required=False,
label=_('Disk (MB)')
label=_('Disk')
)
config_template = DynamicModelChoiceField(
queryset=ConfigTemplate.objects.all(),
@@ -159,6 +161,13 @@ class VirtualMachineBulkEditForm(PrimaryModelBulkEditForm):
'site', 'cluster', 'device', 'role', 'tenant', 'platform', 'vcpus', 'memory', 'disk', 'description', 'comments',
)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Set unit labels based on configured RAM_BASE_UNIT / DISK_BASE_UNIT (MB vs MiB)
self.fields['memory'].label = _('Memory ({unit})').format(unit=get_capacity_unit_label(settings.RAM_BASE_UNIT))
self.fields['disk'].label = _('Disk ({unit})').format(unit=get_capacity_unit_label(settings.DISK_BASE_UNIT))
class VMInterfaceBulkEditForm(OwnerMixin, NetBoxModelBulkEditForm):
virtual_machine = forms.ModelChoiceField(
@@ -304,7 +313,7 @@ class VirtualDiskBulkEditForm(OwnerMixin, NetBoxModelBulkEditForm):
)
size = forms.IntegerField(
required=False,
label=_('Size (MB)')
label=_('Size')
)
description = forms.CharField(
label=_('Description'),
@@ -318,6 +327,12 @@ class VirtualDiskBulkEditForm(OwnerMixin, NetBoxModelBulkEditForm):
)
nullable_fields = ('description',)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Set unit label based on configured DISK_BASE_UNIT (MB vs MiB)
self.fields['size'].label = _('Size ({unit})').format(unit=get_capacity_unit_label(settings.DISK_BASE_UNIT))
class VirtualDiskBulkRenameForm(BulkRenameForm):
pk = forms.ModelMultipleChoiceField(

View File

@@ -1,4 +1,5 @@
from django import forms
from django.conf import settings
from django.utils.translation import gettext_lazy as _
from dcim.choices import *
@@ -12,6 +13,7 @@ from tenancy.forms import ContactModelFilterForm, TenancyFilterForm
from utilities.forms import BOOLEAN_WITH_BLANK_CHOICES
from utilities.forms.fields import DynamicModelMultipleChoiceField, TagFilterField
from utilities.forms.rendering import FieldSet
from utilities.forms.utils import get_capacity_unit_label
from virtualization.choices import *
from virtualization.models import *
from vpn.models import L2VPN
@@ -281,8 +283,14 @@ class VirtualDiskFilterForm(OwnerFilterMixin, NetBoxModelFilterSetForm):
label=_('Virtual machine')
)
size = forms.IntegerField(
label=_('Size (MB)'),
label=_('Size'),
required=False,
min_value=1
)
tag = TagFilterField(model)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Set unit label based on configured DISK_BASE_UNIT (MB vs MiB)
self.fields['size'].label = _('Size ({unit})').format(unit=get_capacity_unit_label(settings.DISK_BASE_UNIT))

View File

@@ -1,5 +1,6 @@
from django import forms
from django.apps import apps
from django.conf import settings
from django.contrib.contenttypes.models import ContentType
from django.core.exceptions import ValidationError
from django.utils.translation import gettext_lazy as _
@@ -16,6 +17,7 @@ from tenancy.forms import TenancyForm
from utilities.forms import ConfirmationForm
from utilities.forms.fields import DynamicModelChoiceField, DynamicModelMultipleChoiceField, JSONField
from utilities.forms.rendering import FieldSet
from utilities.forms.utils import get_capacity_unit_label
from utilities.forms.widgets import HTMXSelect
from virtualization.models import *
@@ -236,6 +238,10 @@ class VirtualMachineForm(TenancyForm, PrimaryModelForm):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Set unit labels based on configured RAM_BASE_UNIT / DISK_BASE_UNIT (MB vs MiB)
self.fields['memory'].label = _('Memory ({unit})').format(unit=get_capacity_unit_label(settings.RAM_BASE_UNIT))
self.fields['disk'].label = _('Disk ({unit})').format(unit=get_capacity_unit_label(settings.DISK_BASE_UNIT))
if self.instance.pk:
# Disable the disk field if one or more VirtualDisks have been created
@@ -401,3 +407,9 @@ class VirtualDiskForm(VMComponentForm):
fields = [
'virtual_machine', 'name', 'size', 'description', 'owner', 'tags',
]
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Set unit label based on configured DISK_BASE_UNIT (MB vs MiB)
self.fields['size'].label = _('Size ({unit})').format(unit=get_capacity_unit_label(settings.DISK_BASE_UNIT))

View File

@@ -121,12 +121,12 @@ class VirtualMachine(ContactsMixin, ImageAttachmentsMixin, RenderConfigMixin, Co
memory = models.PositiveIntegerField(
blank=True,
null=True,
verbose_name=_('memory (MB)')
verbose_name=_('memory')
)
disk = models.PositiveIntegerField(
blank=True,
null=True,
verbose_name=_('disk (MB)')
verbose_name=_('disk')
)
serial = models.CharField(
verbose_name=_('serial number'),
@@ -425,7 +425,7 @@ class VMInterface(ComponentModel, BaseInterface, TrackingModelMixin):
class VirtualDisk(ComponentModel, TrackingModelMixin):
size = models.PositiveIntegerField(
verbose_name=_('size (MB)'),
verbose_name=_('size'),
)
class Meta(ComponentModel.Meta):

View File

@@ -4,7 +4,7 @@ from django.utils.translation import gettext_lazy as _
from dcim.tables.devices import BaseInterfaceTable
from netbox.tables import NetBoxTable, PrimaryModelTable, columns
from tenancy.tables import ContactsColumnMixin, TenancyColumnsMixin
from utilities.templatetags.helpers import humanize_disk_megabytes
from utilities.templatetags.helpers import humanize_disk_capacity, humanize_ram_capacity
from virtualization.models import VirtualDisk, VirtualMachine, VMInterface
from .template_code import *
@@ -93,8 +93,11 @@ class VirtualMachineTable(TenancyColumnsMixin, ContactsColumnMixin, PrimaryModel
'pk', 'name', 'status', 'site', 'cluster', 'role', 'tenant', 'vcpus', 'memory', 'disk', 'primary_ip',
)
def render_memory(self, value):
return humanize_ram_capacity(value)
def render_disk(self, value):
return humanize_disk_megabytes(value)
return humanize_disk_capacity(value)
#
@@ -184,7 +187,7 @@ class VirtualDiskTable(NetBoxTable):
}
def render_size(self, value):
return humanize_disk_megabytes(value)
return humanize_disk_capacity(value)
class VirtualMachineVirtualDiskTable(VirtualDiskTable):

View File

@@ -0,0 +1,26 @@
from utilities.testing import TableTestCases
from virtualization.tables import *
class ClusterTypeTableTest(TableTestCases.OrderableColumnsTestCase):
table = ClusterTypeTable
class ClusterGroupTableTest(TableTestCases.OrderableColumnsTestCase):
table = ClusterGroupTable
class ClusterTableTest(TableTestCases.OrderableColumnsTestCase):
table = ClusterTable
class VirtualMachineTableTest(TableTestCases.OrderableColumnsTestCase):
table = VirtualMachineTable
class VMInterfaceTableTest(TableTestCases.OrderableColumnsTestCase):
table = VMInterfaceTable
class VirtualDiskTableTest(TableTestCases.OrderableColumnsTestCase):
table = VirtualDiskTable

View File

@@ -66,7 +66,7 @@ class TunnelTable(TenancyColumnsMixin, ContactsColumnMixin, PrimaryModelTable):
model = Tunnel
fields = (
'pk', 'id', 'name', 'group', 'status', 'encapsulation', 'ipsec_profile', 'tenant', 'tenant_group',
'tunnel_id', 'termination_count', 'description', 'contacts', 'comments', 'tags', 'created',
'tunnel_id', 'terminations_count', 'description', 'contacts', 'comments', 'tags', 'created',
'last_updated',
)
default_columns = ('pk', 'name', 'group', 'status', 'encapsulation', 'tenant', 'terminations_count')

View File

@@ -1,23 +1,42 @@
from django.test import RequestFactory, TestCase, tag
from vpn.models import TunnelTermination
from vpn.tables import TunnelTerminationTable
from utilities.testing import TableTestCases
from vpn.tables import *
@tag('regression')
class TunnelTerminationTableTest(TestCase):
def test_every_orderable_field_does_not_throw_exception(self):
terminations = TunnelTermination.objects.all()
fake_request = RequestFactory().get("/")
disallowed = {'actions'}
class TunnelGroupTableTest(TableTestCases.OrderableColumnsTestCase):
table = TunnelGroupTable
orderable_columns = [
column.name for column in TunnelTerminationTable(terminations).columns
if column.orderable and column.name not in disallowed
]
for col in orderable_columns:
for dir in ('-', ''):
table = TunnelTerminationTable(terminations)
table.order_by = f'{dir}{col}'
table.as_html(fake_request)
class TunnelTableTest(TableTestCases.OrderableColumnsTestCase):
table = TunnelTable
class TunnelTerminationTableTest(TableTestCases.OrderableColumnsTestCase):
table = TunnelTerminationTable
class IKEProposalTableTest(TableTestCases.OrderableColumnsTestCase):
table = IKEProposalTable
class IKEPolicyTableTest(TableTestCases.OrderableColumnsTestCase):
table = IKEPolicyTable
class IPSecProposalTableTest(TableTestCases.OrderableColumnsTestCase):
table = IPSecProposalTable
class IPSecPolicyTableTest(TableTestCases.OrderableColumnsTestCase):
table = IPSecPolicyTable
class IPSecProfileTableTest(TableTestCases.OrderableColumnsTestCase):
table = IPSecProfileTable
class L2VPNTableTest(TableTestCases.OrderableColumnsTestCase):
table = L2VPNTable
class L2VPNTerminationTableTest(TableTestCases.OrderableColumnsTestCase):
table = L2VPNTerminationTable

View File

@@ -0,0 +1,14 @@
from utilities.testing import TableTestCases
from wireless.tables import *
class WirelessLANGroupTableTest(TableTestCases.OrderableColumnsTestCase):
table = WirelessLANGroupTable
class WirelessLANTableTest(TableTestCases.OrderableColumnsTestCase):
table = WirelessLANTable
class WirelessLinkTableTest(TableTestCases.OrderableColumnsTestCase):
table = WirelessLinkTable

View File

@@ -10,7 +10,7 @@ django-pglocks==1.0.4
django-prometheus==2.4.1
django-redis==6.0.0
django-rich==2.2.0
django-rq==3.2.2
django-rq==4.0.1
django-storages==1.14.6
django-tables2==2.8.0
django-taggit==6.1.0