mirror of
https://github.com/netbox-community/netbox.git
synced 2026-04-03 16:07:18 +02:00
Compare commits
11 Commits
21770-embe
...
21766-impr
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
209c60ea6e | ||
|
|
f058ee3d60 | ||
|
|
49ba0dd495 | ||
|
|
b4ee2cf447 | ||
|
|
34098bb20a | ||
|
|
a19daa5466 | ||
|
|
40eec679d9 | ||
|
|
57556e3fdb | ||
|
|
f2d8ae29c2 | ||
|
|
f6eb5dda0f | ||
|
|
c7bbfb24c5 |
@@ -384,6 +384,18 @@ A calendar date. Returns a `datetime.date` object.
|
||||
|
||||
A complete date & time. Returns a `datetime.datetime` object.
|
||||
|
||||
## Uploading Scripts via the API
|
||||
|
||||
Script modules can be uploaded to NetBox via the REST API by sending a `multipart/form-data` POST request to `/api/extras/scripts/upload/`. The caller must have the `extras.add_scriptmodule` and `core.add_managedfile` permissions.
|
||||
|
||||
```no-highlight
|
||||
curl -X POST \
|
||||
-H "Authorization: Token $TOKEN" \
|
||||
-H "Accept: application/json; indent=4" \
|
||||
-F "file=@/path/to/myscript.py" \
|
||||
http://netbox/api/extras/scripts/upload/
|
||||
```
|
||||
|
||||
## Running Custom Scripts
|
||||
|
||||
!!! note
|
||||
|
||||
@@ -95,6 +95,7 @@ class VirtualCircuitTerminationTable(NetBoxTable):
|
||||
verbose_name=_('Provider network')
|
||||
)
|
||||
provider_account = tables.Column(
|
||||
accessor=tables.A('virtual_circuit__provider_account'),
|
||||
linkify=True,
|
||||
verbose_name=_('Account')
|
||||
)
|
||||
@@ -112,7 +113,7 @@ class VirtualCircuitTerminationTable(NetBoxTable):
|
||||
class Meta(NetBoxTable.Meta):
|
||||
model = VirtualCircuitTermination
|
||||
fields = (
|
||||
'pk', 'id', 'virtual_circuit', 'provider', 'provider_network', 'provider_account', 'role', 'interfaces',
|
||||
'pk', 'id', 'virtual_circuit', 'provider', 'provider_network', 'provider_account', 'role', 'interface',
|
||||
'description', 'created', 'last_updated', 'actions',
|
||||
)
|
||||
default_columns = (
|
||||
|
||||
@@ -1,48 +1,46 @@
|
||||
from django.test import RequestFactory, TestCase, tag
|
||||
|
||||
from circuits.models import CircuitGroupAssignment, CircuitTermination
|
||||
from circuits.tables import CircuitGroupAssignmentTable, CircuitTerminationTable
|
||||
from circuits.tables import *
|
||||
from utilities.testing import TableTestCases
|
||||
|
||||
|
||||
@tag('regression')
|
||||
class CircuitTerminationTableTest(TestCase):
|
||||
def test_every_orderable_field_does_not_throw_exception(self):
|
||||
terminations = CircuitTermination.objects.all()
|
||||
disallowed = {
|
||||
'actions',
|
||||
}
|
||||
|
||||
orderable_columns = [
|
||||
column.name
|
||||
for column in CircuitTerminationTable(terminations).columns
|
||||
if column.orderable and column.name not in disallowed
|
||||
]
|
||||
fake_request = RequestFactory().get('/')
|
||||
|
||||
for col in orderable_columns:
|
||||
for direction in ('-', ''):
|
||||
table = CircuitTerminationTable(terminations)
|
||||
table.order_by = f'{direction}{col}'
|
||||
table.as_html(fake_request)
|
||||
class CircuitTypeTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = CircuitTypeTable
|
||||
|
||||
|
||||
@tag('regression')
|
||||
class CircuitGroupAssignmentTableTest(TestCase):
|
||||
def test_every_orderable_field_does_not_throw_exception(self):
|
||||
assignment = CircuitGroupAssignment.objects.all()
|
||||
disallowed = {
|
||||
'actions',
|
||||
}
|
||||
class CircuitTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = CircuitTable
|
||||
|
||||
orderable_columns = [
|
||||
column.name
|
||||
for column in CircuitGroupAssignmentTable(assignment).columns
|
||||
if column.orderable and column.name not in disallowed
|
||||
]
|
||||
fake_request = RequestFactory().get('/')
|
||||
|
||||
for col in orderable_columns:
|
||||
for direction in ('-', ''):
|
||||
table = CircuitGroupAssignmentTable(assignment)
|
||||
table.order_by = f'{direction}{col}'
|
||||
table.as_html(fake_request)
|
||||
class CircuitTerminationTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = CircuitTerminationTable
|
||||
|
||||
|
||||
class CircuitGroupTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = CircuitGroupTable
|
||||
|
||||
|
||||
class CircuitGroupAssignmentTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = CircuitGroupAssignmentTable
|
||||
|
||||
|
||||
class ProviderTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = ProviderTable
|
||||
|
||||
|
||||
class ProviderAccountTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = ProviderAccountTable
|
||||
|
||||
|
||||
class ProviderNetworkTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = ProviderNetworkTable
|
||||
|
||||
|
||||
class VirtualCircuitTypeTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = VirtualCircuitTypeTable
|
||||
|
||||
|
||||
class VirtualCircuitTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = VirtualCircuitTable
|
||||
|
||||
|
||||
class VirtualCircuitTerminationTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = VirtualCircuitTerminationTable
|
||||
|
||||
@@ -2,7 +2,7 @@ from django.http import Http404, HttpResponse
|
||||
from django.shortcuts import get_object_or_404
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
from django_rq.queues import get_redis_connection
|
||||
from django_rq.settings import QUEUES_LIST
|
||||
from django_rq.settings import get_queues_list
|
||||
from django_rq.utils import get_statistics
|
||||
from drf_spectacular.types import OpenApiTypes
|
||||
from drf_spectacular.utils import OpenApiParameter, extend_schema
|
||||
@@ -195,7 +195,7 @@ class BackgroundWorkerViewSet(BaseRQViewSet):
|
||||
return 'Background Workers'
|
||||
|
||||
def get_data(self):
|
||||
config = QUEUES_LIST[0]
|
||||
config = get_queues_list()[0]
|
||||
return Worker.all(get_redis_connection(config['connection_config']))
|
||||
|
||||
@extend_schema(
|
||||
@@ -205,7 +205,7 @@ class BackgroundWorkerViewSet(BaseRQViewSet):
|
||||
)
|
||||
def retrieve(self, request, name):
|
||||
# all the RQ queues should use the same connection
|
||||
config = QUEUES_LIST[0]
|
||||
config = get_queues_list()[0]
|
||||
workers = Worker.all(get_redis_connection(config['connection_config']))
|
||||
worker = next((item for item in workers if item.name == name), None)
|
||||
if not worker:
|
||||
@@ -229,7 +229,7 @@ class BackgroundTaskViewSet(BaseRQViewSet):
|
||||
return get_rq_jobs()
|
||||
|
||||
def get_task_from_id(self, task_id):
|
||||
config = QUEUES_LIST[0]
|
||||
config = get_queues_list()[0]
|
||||
task = RQ_Job.fetch(task_id, connection=get_redis_connection(config['connection_config']))
|
||||
if not task:
|
||||
raise Http404
|
||||
|
||||
@@ -19,6 +19,7 @@ REVISION_BUTTONS = """
|
||||
class ConfigRevisionTable(NetBoxTable):
|
||||
is_active = columns.BooleanColumn(
|
||||
verbose_name=_('Is Active'),
|
||||
accessor='active',
|
||||
false_mark=None
|
||||
)
|
||||
actions = columns.ActionsColumn(
|
||||
|
||||
26
netbox/core/tests/test_tables.py
Normal file
26
netbox/core/tests/test_tables.py
Normal file
@@ -0,0 +1,26 @@
|
||||
from core.models import ObjectChange
|
||||
from core.tables import *
|
||||
from utilities.testing import TableTestCases
|
||||
|
||||
|
||||
class DataSourceTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = DataSourceTable
|
||||
|
||||
|
||||
class DataFileTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = DataFileTable
|
||||
|
||||
|
||||
class JobTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = JobTable
|
||||
|
||||
|
||||
class ObjectChangeTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = ObjectChangeTable
|
||||
queryset_sources = [
|
||||
('ObjectChangeListView', ObjectChange.objects.valid_models()),
|
||||
]
|
||||
|
||||
|
||||
class ConfigRevisionTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = ConfigRevisionTable
|
||||
@@ -6,7 +6,7 @@ from datetime import datetime
|
||||
from django.urls import reverse
|
||||
from django.utils import timezone
|
||||
from django_rq import get_queue
|
||||
from django_rq.settings import QUEUES_MAP
|
||||
from django_rq.settings import get_queues_map
|
||||
from django_rq.workers import get_worker
|
||||
from rq.job import Job as RQ_Job
|
||||
from rq.job import JobStatus
|
||||
@@ -189,7 +189,7 @@ class BackgroundTaskTestCase(TestCase):
|
||||
def test_background_tasks_list_default(self):
|
||||
queue = get_queue('default')
|
||||
queue.enqueue(self.dummy_job_default)
|
||||
queue_index = QUEUES_MAP['default']
|
||||
queue_index = get_queues_map()['default']
|
||||
|
||||
response = self.client.get(reverse('core:background_task_list', args=[queue_index, 'queued']))
|
||||
self.assertEqual(response.status_code, 200)
|
||||
@@ -198,7 +198,7 @@ class BackgroundTaskTestCase(TestCase):
|
||||
def test_background_tasks_list_high(self):
|
||||
queue = get_queue('high')
|
||||
queue.enqueue(self.dummy_job_high)
|
||||
queue_index = QUEUES_MAP['high']
|
||||
queue_index = get_queues_map()['high']
|
||||
|
||||
response = self.client.get(reverse('core:background_task_list', args=[queue_index, 'queued']))
|
||||
self.assertEqual(response.status_code, 200)
|
||||
@@ -207,7 +207,7 @@ class BackgroundTaskTestCase(TestCase):
|
||||
def test_background_tasks_list_finished(self):
|
||||
queue = get_queue('default')
|
||||
job = queue.enqueue(self.dummy_job_default)
|
||||
queue_index = QUEUES_MAP['default']
|
||||
queue_index = get_queues_map()['default']
|
||||
|
||||
registry = FinishedJobRegistry(queue.name, queue.connection)
|
||||
registry.add(job, 2)
|
||||
@@ -218,7 +218,7 @@ class BackgroundTaskTestCase(TestCase):
|
||||
def test_background_tasks_list_failed(self):
|
||||
queue = get_queue('default')
|
||||
job = queue.enqueue(self.dummy_job_default)
|
||||
queue_index = QUEUES_MAP['default']
|
||||
queue_index = get_queues_map()['default']
|
||||
|
||||
registry = FailedJobRegistry(queue.name, queue.connection)
|
||||
registry.add(job, 2)
|
||||
@@ -229,7 +229,7 @@ class BackgroundTaskTestCase(TestCase):
|
||||
def test_background_tasks_scheduled(self):
|
||||
queue = get_queue('default')
|
||||
queue.enqueue_at(datetime.now(), self.dummy_job_default)
|
||||
queue_index = QUEUES_MAP['default']
|
||||
queue_index = get_queues_map()['default']
|
||||
|
||||
response = self.client.get(reverse('core:background_task_list', args=[queue_index, 'scheduled']))
|
||||
self.assertEqual(response.status_code, 200)
|
||||
@@ -238,7 +238,7 @@ class BackgroundTaskTestCase(TestCase):
|
||||
def test_background_tasks_list_deferred(self):
|
||||
queue = get_queue('default')
|
||||
job = queue.enqueue(self.dummy_job_default)
|
||||
queue_index = QUEUES_MAP['default']
|
||||
queue_index = get_queues_map()['default']
|
||||
|
||||
registry = DeferredJobRegistry(queue.name, queue.connection)
|
||||
registry.add(job, 2)
|
||||
@@ -335,7 +335,7 @@ class BackgroundTaskTestCase(TestCase):
|
||||
worker2 = get_worker('high')
|
||||
worker2.register_birth()
|
||||
|
||||
queue_index = QUEUES_MAP['default']
|
||||
queue_index = get_queues_map()['default']
|
||||
response = self.client.get(reverse('core:worker_list', args=[queue_index]))
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertIn(str(worker1.name), str(response.content))
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
from django.http import Http404
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
from django_rq.queues import get_queue, get_queue_by_index, get_redis_connection
|
||||
from django_rq.settings import QUEUES_LIST, QUEUES_MAP
|
||||
from django_rq.settings import get_queues_list, get_queues_map
|
||||
from django_rq.utils import get_jobs, stop_jobs
|
||||
from rq import requeue_job
|
||||
from rq.exceptions import NoSuchJobError
|
||||
@@ -31,7 +31,7 @@ def get_rq_jobs():
|
||||
"""
|
||||
jobs = set()
|
||||
|
||||
for queue in QUEUES_LIST:
|
||||
for queue in get_queues_list():
|
||||
queue = get_queue(queue['name'])
|
||||
jobs.update(queue.get_jobs())
|
||||
|
||||
@@ -78,13 +78,13 @@ def delete_rq_job(job_id):
|
||||
"""
|
||||
Delete the specified RQ job.
|
||||
"""
|
||||
config = QUEUES_LIST[0]
|
||||
config = get_queues_list()[0]
|
||||
try:
|
||||
job = RQ_Job.fetch(job_id, connection=get_redis_connection(config['connection_config']),)
|
||||
except NoSuchJobError:
|
||||
raise Http404(_("Job {job_id} not found").format(job_id=job_id))
|
||||
|
||||
queue_index = QUEUES_MAP[job.origin]
|
||||
queue_index = get_queues_map()[job.origin]
|
||||
queue = get_queue_by_index(queue_index)
|
||||
|
||||
# Remove job id from queue and delete the actual job
|
||||
@@ -96,13 +96,13 @@ def requeue_rq_job(job_id):
|
||||
"""
|
||||
Requeue the specified RQ job.
|
||||
"""
|
||||
config = QUEUES_LIST[0]
|
||||
config = get_queues_list()[0]
|
||||
try:
|
||||
job = RQ_Job.fetch(job_id, connection=get_redis_connection(config['connection_config']),)
|
||||
except NoSuchJobError:
|
||||
raise Http404(_("Job {id} not found.").format(id=job_id))
|
||||
|
||||
queue_index = QUEUES_MAP[job.origin]
|
||||
queue_index = get_queues_map()[job.origin]
|
||||
queue = get_queue_by_index(queue_index)
|
||||
|
||||
requeue_job(job_id, connection=queue.connection, serializer=queue.serializer)
|
||||
@@ -112,13 +112,13 @@ def enqueue_rq_job(job_id):
|
||||
"""
|
||||
Enqueue the specified RQ job.
|
||||
"""
|
||||
config = QUEUES_LIST[0]
|
||||
config = get_queues_list()[0]
|
||||
try:
|
||||
job = RQ_Job.fetch(job_id, connection=get_redis_connection(config['connection_config']),)
|
||||
except NoSuchJobError:
|
||||
raise Http404(_("Job {id} not found.").format(id=job_id))
|
||||
|
||||
queue_index = QUEUES_MAP[job.origin]
|
||||
queue_index = get_queues_map()[job.origin]
|
||||
queue = get_queue_by_index(queue_index)
|
||||
|
||||
try:
|
||||
@@ -144,13 +144,13 @@ def stop_rq_job(job_id):
|
||||
"""
|
||||
Stop the specified RQ job.
|
||||
"""
|
||||
config = QUEUES_LIST[0]
|
||||
config = get_queues_list()[0]
|
||||
try:
|
||||
job = RQ_Job.fetch(job_id, connection=get_redis_connection(config['connection_config']),)
|
||||
except NoSuchJobError:
|
||||
raise Http404(_("Job {job_id} not found").format(job_id=job_id))
|
||||
|
||||
queue_index = QUEUES_MAP[job.origin]
|
||||
queue_index = get_queues_map()[job.origin]
|
||||
queue = get_queue_by_index(queue_index)
|
||||
|
||||
return stop_jobs(queue, job_id)[0]
|
||||
|
||||
@@ -14,7 +14,7 @@ from django.urls import reverse
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
from django.views.generic import View
|
||||
from django_rq.queues import get_connection, get_queue_by_index, get_redis_connection
|
||||
from django_rq.settings import QUEUES_LIST, QUEUES_MAP
|
||||
from django_rq.settings import get_queues_list, get_queues_map
|
||||
from django_rq.utils import get_statistics
|
||||
from rq.exceptions import NoSuchJobError
|
||||
from rq.job import Job as RQ_Job
|
||||
@@ -524,13 +524,13 @@ class BackgroundTaskView(BaseRQView):
|
||||
|
||||
def get(self, request, job_id):
|
||||
# all the RQ queues should use the same connection
|
||||
config = QUEUES_LIST[0]
|
||||
config = get_queues_list()[0]
|
||||
try:
|
||||
job = RQ_Job.fetch(job_id, connection=get_redis_connection(config['connection_config']),)
|
||||
except NoSuchJobError:
|
||||
raise Http404(_("Job {job_id} not found").format(job_id=job_id))
|
||||
|
||||
queue_index = QUEUES_MAP[job.origin]
|
||||
queue_index = get_queues_map()[job.origin]
|
||||
queue = get_queue_by_index(queue_index)
|
||||
|
||||
try:
|
||||
@@ -640,7 +640,7 @@ class WorkerView(BaseRQView):
|
||||
|
||||
def get(self, request, key):
|
||||
# all the RQ queues should use the same connection
|
||||
config = QUEUES_LIST[0]
|
||||
config = get_queues_list()[0]
|
||||
worker = Worker.find_by_key('rq:worker:' + key, connection=get_redis_connection(config['connection_config']))
|
||||
# Convert microseconds to milliseconds
|
||||
worker.total_working_time = worker.total_working_time / 1000
|
||||
|
||||
@@ -38,7 +38,15 @@ class ConnectedEndpointsSerializer(serializers.ModelSerializer):
|
||||
|
||||
@extend_schema_field(serializers.BooleanField)
|
||||
def get_connected_endpoints_reachable(self, obj):
|
||||
return obj._path and obj._path.is_complete and obj._path.is_active
|
||||
"""
|
||||
Return whether the connected endpoints are reachable via a complete, active cable path.
|
||||
"""
|
||||
# Use the public `path` accessor rather than dereferencing `_path`
|
||||
# directly. `path` already handles the stale in-memory relation case
|
||||
# that can occur while CablePath rows are rebuilt during cable edits.
|
||||
if path := obj.path:
|
||||
return path.is_complete and path.is_active
|
||||
return False
|
||||
|
||||
|
||||
class PortSerializer(serializers.ModelSerializer):
|
||||
|
||||
@@ -6,8 +6,9 @@ from drf_spectacular.utils import extend_schema_field
|
||||
from rest_framework import serializers
|
||||
|
||||
from dcim.choices import *
|
||||
from dcim.constants import MACADDRESS_ASSIGNMENT_MODELS
|
||||
from dcim.constants import MACADDRESS_ASSIGNMENT_MODELS, MODULE_TOKEN
|
||||
from dcim.models import Device, DeviceBay, MACAddress, Module, VirtualDeviceContext
|
||||
from dcim.utils import get_module_bay_positions, resolve_module_placeholder
|
||||
from extras.api.serializers_.configtemplates import ConfigTemplateSerializer
|
||||
from ipam.api.serializers_.ip import IPAddressSerializer
|
||||
from netbox.api.fields import ChoiceField, ContentTypeField, RelatedObjectCountField
|
||||
@@ -159,6 +160,60 @@ class ModuleSerializer(PrimaryModelSerializer):
|
||||
]
|
||||
brief_fields = ('id', 'url', 'display', 'device', 'module_bay', 'module_type', 'description')
|
||||
|
||||
def validate(self, data):
|
||||
data = super().validate(data)
|
||||
|
||||
if self.nested:
|
||||
return data
|
||||
|
||||
# Skip validation for existing modules (updates)
|
||||
if self.instance is not None:
|
||||
return data
|
||||
|
||||
module_bay = data.get('module_bay')
|
||||
module_type = data.get('module_type')
|
||||
device = data.get('device')
|
||||
|
||||
if not all((module_bay, module_type, device)):
|
||||
return data
|
||||
|
||||
positions = get_module_bay_positions(module_bay)
|
||||
|
||||
for templates, component_attribute in [
|
||||
("consoleporttemplates", "consoleports"),
|
||||
("consoleserverporttemplates", "consoleserverports"),
|
||||
("interfacetemplates", "interfaces"),
|
||||
("powerporttemplates", "powerports"),
|
||||
("poweroutlettemplates", "poweroutlets"),
|
||||
("rearporttemplates", "rearports"),
|
||||
("frontporttemplates", "frontports"),
|
||||
]:
|
||||
installed_components = {
|
||||
component.name: component for component in getattr(device, component_attribute).all()
|
||||
}
|
||||
|
||||
for template in getattr(module_type, templates).all():
|
||||
resolved_name = template.name
|
||||
if MODULE_TOKEN in template.name:
|
||||
if not module_bay.position:
|
||||
raise serializers.ValidationError(
|
||||
_("Cannot install module with placeholder values in a module bay with no position defined.")
|
||||
)
|
||||
try:
|
||||
resolved_name = resolve_module_placeholder(template.name, positions)
|
||||
except ValueError as e:
|
||||
raise serializers.ValidationError(str(e))
|
||||
|
||||
if resolved_name in installed_components:
|
||||
raise serializers.ValidationError(
|
||||
_("A {model} named {name} already exists").format(
|
||||
model=template.component_model.__name__,
|
||||
name=resolved_name
|
||||
)
|
||||
)
|
||||
|
||||
return data
|
||||
|
||||
|
||||
class MACAddressSerializer(PrimaryModelSerializer):
|
||||
assigned_object_type = ContentTypeField(
|
||||
|
||||
@@ -254,6 +254,21 @@ class Trunk8C4PCableProfile(BaseCableProfile):
|
||||
b_connectors = a_connectors
|
||||
|
||||
|
||||
class Breakout1C2Px2C1PCableProfile(BaseCableProfile):
|
||||
a_connectors = {
|
||||
1: 2,
|
||||
}
|
||||
b_connectors = {
|
||||
1: 1,
|
||||
2: 1,
|
||||
}
|
||||
_mapping = {
|
||||
(1, 1): (1, 1),
|
||||
(1, 2): (2, 1),
|
||||
(2, 1): (1, 2),
|
||||
}
|
||||
|
||||
|
||||
class Breakout1C4Px4C1PCableProfile(BaseCableProfile):
|
||||
a_connectors = {
|
||||
1: 4,
|
||||
|
||||
@@ -1776,6 +1776,7 @@ class CableProfileChoices(ChoiceSet):
|
||||
TRUNK_4C8P = 'trunk-4c8p'
|
||||
TRUNK_8C4P = 'trunk-8c4p'
|
||||
# Breakouts
|
||||
BREAKOUT_1C2P_2C1P = 'breakout-1c2p-2c1p'
|
||||
BREAKOUT_1C4P_4C1P = 'breakout-1c4p-4c1p'
|
||||
BREAKOUT_1C6P_6C1P = 'breakout-1c6p-6c1p'
|
||||
BREAKOUT_2C4P_8C1P_SHUFFLE = 'breakout-2c4p-8c1p-shuffle'
|
||||
@@ -1815,6 +1816,7 @@ class CableProfileChoices(ChoiceSet):
|
||||
(
|
||||
_('Breakout'),
|
||||
(
|
||||
(BREAKOUT_1C2P_2C1P, _('1C2P:2C1P breakout')),
|
||||
(BREAKOUT_1C4P_4C1P, _('1C4P:4C1P breakout')),
|
||||
(BREAKOUT_1C6P_6C1P, _('1C6P:6C1P breakout')),
|
||||
(BREAKOUT_2C4P_8C1P_SHUFFLE, _('2C4P:8C1P breakout (shuffle)')),
|
||||
|
||||
@@ -3,6 +3,7 @@ from django.utils.translation import gettext_lazy as _
|
||||
|
||||
from dcim.choices import *
|
||||
from dcim.constants import *
|
||||
from dcim.utils import get_module_bay_positions, resolve_module_placeholder
|
||||
from utilities.forms import get_field_value
|
||||
|
||||
__all__ = (
|
||||
@@ -70,18 +71,6 @@ class InterfaceCommonForm(forms.Form):
|
||||
|
||||
class ModuleCommonForm(forms.Form):
|
||||
|
||||
def _get_module_bay_tree(self, module_bay):
|
||||
module_bays = []
|
||||
while module_bay:
|
||||
module_bays.append(module_bay)
|
||||
if module_bay.module:
|
||||
module_bay = module_bay.module.module_bay
|
||||
else:
|
||||
module_bay = None
|
||||
|
||||
module_bays.reverse()
|
||||
return module_bays
|
||||
|
||||
def clean(self):
|
||||
super().clean()
|
||||
|
||||
@@ -100,7 +89,7 @@ class ModuleCommonForm(forms.Form):
|
||||
self.instance._disable_replication = True
|
||||
return
|
||||
|
||||
module_bays = self._get_module_bay_tree(module_bay)
|
||||
positions = get_module_bay_positions(module_bay)
|
||||
|
||||
for templates, component_attribute in [
|
||||
("consoleporttemplates", "consoleports"),
|
||||
@@ -119,25 +108,16 @@ class ModuleCommonForm(forms.Form):
|
||||
# Get the templates for the module type.
|
||||
for template in getattr(module_type, templates).all():
|
||||
resolved_name = template.name
|
||||
# Installing modules with placeholders require that the bay has a position value
|
||||
if MODULE_TOKEN in template.name:
|
||||
if not module_bay.position:
|
||||
raise forms.ValidationError(
|
||||
_("Cannot install module with placeholder values in a module bay with no position defined.")
|
||||
)
|
||||
|
||||
if len(module_bays) != template.name.count(MODULE_TOKEN):
|
||||
raise forms.ValidationError(
|
||||
_(
|
||||
"Cannot install module with placeholder values in a module bay tree {level} in tree "
|
||||
"but {tokens} placeholders given."
|
||||
).format(
|
||||
level=len(module_bays), tokens=template.name.count(MODULE_TOKEN)
|
||||
)
|
||||
)
|
||||
|
||||
for module_bay in module_bays:
|
||||
resolved_name = resolved_name.replace(MODULE_TOKEN, module_bay.position, 1)
|
||||
try:
|
||||
resolved_name = resolve_module_placeholder(template.name, positions)
|
||||
except ValueError as e:
|
||||
raise forms.ValidationError(str(e))
|
||||
|
||||
existing_item = installed_components.get(resolved_name)
|
||||
|
||||
|
||||
@@ -160,6 +160,7 @@ class Cable(PrimaryModel):
|
||||
CableProfileChoices.TRUNK_4C6P: cable_profiles.Trunk4C6PCableProfile,
|
||||
CableProfileChoices.TRUNK_4C8P: cable_profiles.Trunk4C8PCableProfile,
|
||||
CableProfileChoices.TRUNK_8C4P: cable_profiles.Trunk8C4PCableProfile,
|
||||
CableProfileChoices.BREAKOUT_1C2P_2C1P: cable_profiles.Breakout1C2Px2C1PCableProfile,
|
||||
CableProfileChoices.BREAKOUT_1C4P_4C1P: cable_profiles.Breakout1C4Px4C1PCableProfile,
|
||||
CableProfileChoices.BREAKOUT_1C6P_6C1P: cable_profiles.Breakout1C6Px6C1PCableProfile,
|
||||
CableProfileChoices.BREAKOUT_2C4P_8C1P_SHUFFLE: cable_profiles.Breakout2C4Px8C1PShuffleCableProfile,
|
||||
|
||||
@@ -9,6 +9,7 @@ from dcim.choices import *
|
||||
from dcim.constants import *
|
||||
from dcim.models.base import PortMappingBase
|
||||
from dcim.models.mixins import InterfaceValidationMixin
|
||||
from dcim.utils import get_module_bay_positions, resolve_module_placeholder
|
||||
from netbox.models import ChangeLoggedModel
|
||||
from utilities.fields import ColorField, NaturalOrderingField
|
||||
from utilities.mptt import TreeManager
|
||||
@@ -165,31 +166,15 @@ class ModularComponentTemplateModel(ComponentTemplateModel):
|
||||
_("A component template must be associated with either a device type or a module type.")
|
||||
)
|
||||
|
||||
def _get_module_tree(self, module):
|
||||
modules = []
|
||||
while module:
|
||||
modules.append(module)
|
||||
if module.module_bay:
|
||||
module = module.module_bay.module
|
||||
else:
|
||||
module = None
|
||||
|
||||
modules.reverse()
|
||||
return modules
|
||||
|
||||
def _resolve_module_placeholder(self, value, module):
|
||||
if MODULE_TOKEN not in value or not module:
|
||||
return value
|
||||
modules = self._get_module_tree(module)
|
||||
for m in modules:
|
||||
value = value.replace(MODULE_TOKEN, m.module_bay.position, 1)
|
||||
return value
|
||||
|
||||
def resolve_name(self, module):
|
||||
return self._resolve_module_placeholder(self.name, module)
|
||||
if MODULE_TOKEN not in self.name or not module:
|
||||
return self.name
|
||||
return resolve_module_placeholder(self.name, get_module_bay_positions(module.module_bay))
|
||||
|
||||
def resolve_label(self, module):
|
||||
return self._resolve_module_placeholder(self.label, module)
|
||||
if MODULE_TOKEN not in self.label or not module:
|
||||
return self.label
|
||||
return resolve_module_placeholder(self.label, get_module_bay_positions(module.module_bay))
|
||||
|
||||
|
||||
class ConsolePortTemplate(ModularComponentTemplateModel):
|
||||
@@ -720,7 +705,9 @@ class ModuleBayTemplate(ModularComponentTemplateModel):
|
||||
verbose_name_plural = _('module bay templates')
|
||||
|
||||
def resolve_position(self, module):
|
||||
return self._resolve_module_placeholder(self.position, module)
|
||||
if MODULE_TOKEN not in self.position or not module:
|
||||
return self.position
|
||||
return resolve_module_placeholder(self.position, get_module_bay_positions(module.module_bay))
|
||||
|
||||
def instantiate(self, **kwargs):
|
||||
return self.component_model(
|
||||
|
||||
@@ -2,7 +2,7 @@ from functools import cached_property
|
||||
|
||||
from django.contrib.contenttypes.fields import GenericForeignKey, GenericRelation
|
||||
from django.contrib.postgres.fields import ArrayField
|
||||
from django.core.exceptions import ValidationError
|
||||
from django.core.exceptions import ObjectDoesNotExist, ValidationError
|
||||
from django.core.validators import MaxValueValidator, MinValueValidator
|
||||
from django.db import models
|
||||
from django.db.models import Sum
|
||||
@@ -307,11 +307,12 @@ class PathEndpoint(models.Model):
|
||||
|
||||
`connected_endpoints()` is a convenience method for returning the destination of the associated CablePath, if any.
|
||||
"""
|
||||
|
||||
_path = models.ForeignKey(
|
||||
to='dcim.CablePath',
|
||||
on_delete=models.SET_NULL,
|
||||
null=True,
|
||||
blank=True
|
||||
blank=True,
|
||||
)
|
||||
|
||||
class Meta:
|
||||
@@ -323,11 +324,14 @@ class PathEndpoint(models.Model):
|
||||
|
||||
# Construct the complete path (including e.g. bridged interfaces)
|
||||
while origin is not None:
|
||||
|
||||
if origin._path is None:
|
||||
# Go through the public accessor rather than dereferencing `_path`
|
||||
# directly. During cable edits, CablePath rows can be deleted and
|
||||
# recreated while this endpoint instance is still in memory.
|
||||
cable_path = origin.path
|
||||
if cable_path is None:
|
||||
break
|
||||
|
||||
path.extend(origin._path.path_objects)
|
||||
path.extend(cable_path.path_objects)
|
||||
|
||||
# If the path ends at a non-connected pass-through port, pad out the link and far-end terminations
|
||||
if len(path) % 3 == 1:
|
||||
@@ -336,8 +340,8 @@ class PathEndpoint(models.Model):
|
||||
elif len(path) % 3 == 2:
|
||||
path.insert(-1, [])
|
||||
|
||||
# Check for a bridged relationship to continue the trace
|
||||
destinations = origin._path.destinations
|
||||
# Check for a bridged relationship to continue the trace.
|
||||
destinations = cable_path.destinations
|
||||
if len(destinations) == 1:
|
||||
origin = getattr(destinations[0], 'bridge', None)
|
||||
else:
|
||||
@@ -348,14 +352,42 @@ class PathEndpoint(models.Model):
|
||||
|
||||
@property
|
||||
def path(self):
|
||||
return self._path
|
||||
"""
|
||||
Return this endpoint's current CablePath, if any.
|
||||
|
||||
`_path` is a denormalized reference that is updated from CablePath
|
||||
save/delete handlers, including queryset.update() calls on origin
|
||||
endpoints. That means an already-instantiated endpoint can briefly hold
|
||||
a stale in-memory `_path` relation while the database already points to
|
||||
a different CablePath (or to no path at all).
|
||||
|
||||
If the cached relation points to a CablePath that has just been
|
||||
deleted, refresh only the `_path` field from the database and retry.
|
||||
This keeps the fix cheap and narrowly scoped to the denormalized FK.
|
||||
"""
|
||||
if self._path_id is None:
|
||||
return None
|
||||
|
||||
try:
|
||||
return self._path
|
||||
except ObjectDoesNotExist:
|
||||
# Refresh only the denormalized FK instead of the whole model.
|
||||
# The expected problem here is in-memory staleness during path
|
||||
# rebuilds, not persistent database corruption.
|
||||
self.refresh_from_db(fields=['_path'])
|
||||
return self._path if self._path_id else None
|
||||
|
||||
@cached_property
|
||||
def connected_endpoints(self):
|
||||
"""
|
||||
Caching accessor for the attached CablePath's destination (if any)
|
||||
Caching accessor for the attached CablePath's destinations (if any).
|
||||
|
||||
Always route through `path` so stale in-memory `_path` references are
|
||||
repaired before we cache the result for the lifetime of this instance.
|
||||
"""
|
||||
return self._path.destinations if self._path else []
|
||||
if cable_path := self.path:
|
||||
return cable_path.destinations
|
||||
return []
|
||||
|
||||
|
||||
#
|
||||
|
||||
@@ -1149,7 +1149,7 @@ class VirtualDeviceContextTable(TenancyColumnsMixin, PrimaryModelTable):
|
||||
)
|
||||
device = tables.Column(
|
||||
verbose_name=_('Device'),
|
||||
order_by=('device___name',),
|
||||
order_by=('device__name',),
|
||||
linkify=True
|
||||
)
|
||||
status = columns.ChoiceFieldColumn(
|
||||
|
||||
@@ -56,7 +56,9 @@ class ModuleTypeTable(PrimaryModelTable):
|
||||
template_code=WEIGHT,
|
||||
order_by=('_abs_weight', 'weight_unit')
|
||||
)
|
||||
attributes = columns.DictColumn()
|
||||
attributes = columns.DictColumn(
|
||||
orderable=False,
|
||||
)
|
||||
module_count = columns.LinkedCountColumn(
|
||||
viewname='dcim:module_list',
|
||||
url_params={'module_type_id': 'pk'},
|
||||
|
||||
@@ -5,6 +5,7 @@ from circuits.models import *
|
||||
from core.models import ObjectType
|
||||
from dcim.choices import *
|
||||
from dcim.models import *
|
||||
from extras.events import serialize_for_event
|
||||
from extras.models import CustomField
|
||||
from ipam.models import Prefix
|
||||
from netbox.choices import WeightUnitChoices
|
||||
@@ -893,6 +894,77 @@ class ModuleBayTestCase(TestCase):
|
||||
nested_bay = module.modulebays.get(name='Sub-bay 1-1')
|
||||
self.assertEqual(nested_bay.position, '1-1')
|
||||
|
||||
@tag('regression') # #20474
|
||||
def test_single_module_token_at_nested_depth(self):
|
||||
"""
|
||||
A module type with a single {module} token should install at depth > 1
|
||||
without raising a token count mismatch error, resolving to the immediate
|
||||
parent bay's position.
|
||||
"""
|
||||
manufacturer = Manufacturer.objects.first()
|
||||
site = Site.objects.first()
|
||||
device_role = DeviceRole.objects.first()
|
||||
|
||||
device_type = DeviceType.objects.create(
|
||||
manufacturer=manufacturer,
|
||||
model='Chassis with Rear Card',
|
||||
slug='chassis-with-rear-card'
|
||||
)
|
||||
ModuleBayTemplate.objects.create(
|
||||
device_type=device_type,
|
||||
name='Rear card slot',
|
||||
position='1'
|
||||
)
|
||||
|
||||
rear_card_type = ModuleType.objects.create(
|
||||
manufacturer=manufacturer,
|
||||
model='Rear Card'
|
||||
)
|
||||
ModuleBayTemplate.objects.create(
|
||||
module_type=rear_card_type,
|
||||
name='SFP slot 1',
|
||||
position='1'
|
||||
)
|
||||
ModuleBayTemplate.objects.create(
|
||||
module_type=rear_card_type,
|
||||
name='SFP slot 2',
|
||||
position='2'
|
||||
)
|
||||
|
||||
sfp_type = ModuleType.objects.create(
|
||||
manufacturer=manufacturer,
|
||||
model='SFP Module'
|
||||
)
|
||||
InterfaceTemplate.objects.create(
|
||||
module_type=sfp_type,
|
||||
name='SFP {module}',
|
||||
type=InterfaceTypeChoices.TYPE_10GE_SFP_PLUS
|
||||
)
|
||||
|
||||
device = Device.objects.create(
|
||||
name='Test Chassis',
|
||||
device_type=device_type,
|
||||
role=device_role,
|
||||
site=site
|
||||
)
|
||||
|
||||
rear_card_bay = device.modulebays.get(name='Rear card slot')
|
||||
rear_card = Module.objects.create(
|
||||
device=device,
|
||||
module_bay=rear_card_bay,
|
||||
module_type=rear_card_type
|
||||
)
|
||||
|
||||
sfp_bay = rear_card.modulebays.get(name='SFP slot 2')
|
||||
sfp_module = Module.objects.create(
|
||||
device=device,
|
||||
module_bay=sfp_bay,
|
||||
module_type=sfp_type
|
||||
)
|
||||
|
||||
interface = sfp_module.interfaces.first()
|
||||
self.assertEqual(interface.name, 'SFP 2')
|
||||
|
||||
@tag('regression') # #20912
|
||||
def test_module_bay_parent_cleared_when_module_removed(self):
|
||||
"""Test that the parent field is properly cleared when a module bay's module assignment is removed"""
|
||||
@@ -1274,6 +1346,65 @@ class CableTestCase(TestCase):
|
||||
self.assertEqual(a_terms, [interface1])
|
||||
self.assertEqual(b_terms, [interface2])
|
||||
|
||||
@tag('regression') # #21498
|
||||
def test_path_refreshes_replaced_cablepath_reference(self):
|
||||
"""
|
||||
An already-instantiated interface should refresh its denormalized
|
||||
`_path` foreign key when the referenced CablePath row has been
|
||||
replaced in the database.
|
||||
"""
|
||||
stale_interface = Interface.objects.get(device__name='TestDevice1', name='eth0')
|
||||
old_path = CablePath.objects.get(pk=stale_interface._path_id)
|
||||
|
||||
new_path = CablePath(
|
||||
path=old_path.path,
|
||||
is_active=old_path.is_active,
|
||||
is_complete=old_path.is_complete,
|
||||
is_split=old_path.is_split,
|
||||
)
|
||||
old_path_id = old_path.pk
|
||||
old_path.delete()
|
||||
new_path.save()
|
||||
|
||||
# The old CablePath no longer exists
|
||||
self.assertFalse(CablePath.objects.filter(pk=old_path_id).exists())
|
||||
|
||||
# The already-instantiated interface still points to the deleted path
|
||||
# until the accessor refreshes `_path` from the database.
|
||||
self.assertEqual(stale_interface._path_id, old_path_id)
|
||||
self.assertEqual(stale_interface.path.pk, new_path.pk)
|
||||
|
||||
@tag('regression') # #21498
|
||||
def test_serialize_for_event_handles_stale_cablepath_reference_after_retermination(self):
|
||||
"""
|
||||
Serializing an interface whose previously cached `_path` row has been
|
||||
deleted during cable retermination must not raise.
|
||||
"""
|
||||
stale_interface = Interface.objects.get(device__name='TestDevice2', name='eth0')
|
||||
old_path_id = stale_interface._path_id
|
||||
new_peer = Interface.objects.get(device__name='TestDevice2', name='eth1')
|
||||
cable = stale_interface.cable
|
||||
|
||||
self.assertIsNotNone(cable)
|
||||
self.assertIsNotNone(old_path_id)
|
||||
self.assertEqual(stale_interface.cable_end, 'B')
|
||||
|
||||
cable.b_terminations = [new_peer]
|
||||
cable.save()
|
||||
|
||||
# The old CablePath was deleted during retrace.
|
||||
self.assertFalse(CablePath.objects.filter(pk=old_path_id).exists())
|
||||
|
||||
# The stale in-memory instance still holds the deleted FK value.
|
||||
self.assertEqual(stale_interface._path_id, old_path_id)
|
||||
|
||||
# Serialization must not raise ObjectDoesNotExist. Because this interface
|
||||
# was the former B-side termination, it is now disconnected.
|
||||
data = serialize_for_event(stale_interface)
|
||||
self.assertIsNone(data['connected_endpoints'])
|
||||
self.assertIsNone(data['connected_endpoints_type'])
|
||||
self.assertFalse(data['connected_endpoints_reachable'])
|
||||
|
||||
|
||||
class VirtualDeviceContextTestCase(TestCase):
|
||||
|
||||
|
||||
204
netbox/dcim/tests/test_tables.py
Normal file
204
netbox/dcim/tests/test_tables.py
Normal file
@@ -0,0 +1,204 @@
|
||||
from dcim.models import ConsolePort, Interface, PowerPort
|
||||
from dcim.tables import *
|
||||
from utilities.testing import TableTestCases
|
||||
|
||||
#
|
||||
# Sites
|
||||
#
|
||||
|
||||
|
||||
class RegionTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = RegionTable
|
||||
|
||||
|
||||
class SiteGroupTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = SiteGroupTable
|
||||
|
||||
|
||||
class SiteTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = SiteTable
|
||||
|
||||
|
||||
class LocationTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = LocationTable
|
||||
|
||||
|
||||
#
|
||||
# Racks
|
||||
#
|
||||
|
||||
class RackRoleTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = RackRoleTable
|
||||
|
||||
|
||||
class RackTypeTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = RackTypeTable
|
||||
|
||||
|
||||
class RackTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = RackTable
|
||||
|
||||
|
||||
class RackReservationTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = RackReservationTable
|
||||
|
||||
|
||||
#
|
||||
# Device types
|
||||
#
|
||||
|
||||
class ManufacturerTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = ManufacturerTable
|
||||
|
||||
|
||||
class DeviceTypeTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = DeviceTypeTable
|
||||
|
||||
|
||||
#
|
||||
# Module types
|
||||
#
|
||||
|
||||
class ModuleTypeProfileTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = ModuleTypeProfileTable
|
||||
|
||||
|
||||
class ModuleTypeTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = ModuleTypeTable
|
||||
|
||||
|
||||
class ModuleTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = ModuleTable
|
||||
|
||||
|
||||
#
|
||||
# Devices
|
||||
#
|
||||
|
||||
class DeviceRoleTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = DeviceRoleTable
|
||||
|
||||
|
||||
class PlatformTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = PlatformTable
|
||||
|
||||
|
||||
class DeviceTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = DeviceTable
|
||||
|
||||
|
||||
#
|
||||
# Device components
|
||||
#
|
||||
|
||||
class ConsolePortTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = ConsolePortTable
|
||||
|
||||
|
||||
class ConsoleServerPortTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = ConsoleServerPortTable
|
||||
|
||||
|
||||
class PowerPortTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = PowerPortTable
|
||||
|
||||
|
||||
class PowerOutletTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = PowerOutletTable
|
||||
|
||||
|
||||
class InterfaceTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = InterfaceTable
|
||||
|
||||
|
||||
class FrontPortTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = FrontPortTable
|
||||
|
||||
|
||||
class RearPortTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = RearPortTable
|
||||
|
||||
|
||||
class ModuleBayTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = ModuleBayTable
|
||||
|
||||
|
||||
class DeviceBayTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = DeviceBayTable
|
||||
|
||||
|
||||
class InventoryItemTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = InventoryItemTable
|
||||
|
||||
|
||||
class InventoryItemRoleTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = InventoryItemRoleTable
|
||||
|
||||
|
||||
#
|
||||
# Connections
|
||||
#
|
||||
|
||||
class ConsoleConnectionTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = ConsoleConnectionTable
|
||||
queryset_sources = [
|
||||
('ConsoleConnectionsListView', ConsolePort.objects.filter(_path__is_complete=True)),
|
||||
]
|
||||
|
||||
|
||||
class PowerConnectionTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = PowerConnectionTable
|
||||
queryset_sources = [
|
||||
('PowerConnectionsListView', PowerPort.objects.filter(_path__is_complete=True)),
|
||||
]
|
||||
|
||||
|
||||
class InterfaceConnectionTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = InterfaceConnectionTable
|
||||
queryset_sources = [
|
||||
('InterfaceConnectionsListView', Interface.objects.filter(_path__is_complete=True)),
|
||||
]
|
||||
|
||||
|
||||
#
|
||||
# Cables
|
||||
#
|
||||
|
||||
class CableTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = CableTable
|
||||
|
||||
|
||||
#
|
||||
# Power
|
||||
#
|
||||
|
||||
class PowerPanelTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = PowerPanelTable
|
||||
|
||||
|
||||
class PowerFeedTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = PowerFeedTable
|
||||
|
||||
|
||||
#
|
||||
# Virtual chassis
|
||||
#
|
||||
|
||||
class VirtualChassisTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = VirtualChassisTable
|
||||
|
||||
|
||||
#
|
||||
# Virtual device contexts
|
||||
#
|
||||
|
||||
class VirtualDeviceContextTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = VirtualDeviceContextTable
|
||||
|
||||
|
||||
#
|
||||
# MAC addresses
|
||||
#
|
||||
|
||||
class MACAddressTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = MACAddressTable
|
||||
@@ -3,6 +3,9 @@ from collections import defaultdict
|
||||
from django.apps import apps
|
||||
from django.contrib.contenttypes.models import ContentType
|
||||
from django.db import router, transaction
|
||||
from django.utils.translation import gettext as _
|
||||
|
||||
from dcim.constants import MODULE_TOKEN
|
||||
|
||||
|
||||
def compile_path_node(ct_id, object_id):
|
||||
@@ -33,6 +36,51 @@ def path_node_to_object(repr):
|
||||
return ct.model_class().objects.filter(pk=object_id).first()
|
||||
|
||||
|
||||
def get_module_bay_positions(module_bay):
|
||||
"""
|
||||
Given a module bay, traverse up the module hierarchy and return
|
||||
a list of bay position strings from root to leaf.
|
||||
"""
|
||||
positions = []
|
||||
while module_bay:
|
||||
positions.append(module_bay.position)
|
||||
if module_bay.module:
|
||||
module_bay = module_bay.module.module_bay
|
||||
else:
|
||||
module_bay = None
|
||||
positions.reverse()
|
||||
return positions
|
||||
|
||||
|
||||
def resolve_module_placeholder(value, positions):
|
||||
"""
|
||||
Resolve {module} placeholder tokens in a string using the given
|
||||
list of module bay positions (ordered root to leaf).
|
||||
|
||||
A single {module} token resolves to the leaf (immediate parent) bay's position.
|
||||
Multiple tokens must match the tree depth and resolve level-by-level.
|
||||
|
||||
Returns the resolved string.
|
||||
Raises ValueError if token count is greater than 1 and doesn't match tree depth.
|
||||
"""
|
||||
if MODULE_TOKEN not in value:
|
||||
return value
|
||||
|
||||
token_count = value.count(MODULE_TOKEN)
|
||||
if token_count == 1:
|
||||
return value.replace(MODULE_TOKEN, positions[-1])
|
||||
if token_count == len(positions):
|
||||
for pos in positions:
|
||||
value = value.replace(MODULE_TOKEN, pos, 1)
|
||||
return value
|
||||
raise ValueError(
|
||||
_("Cannot install module with placeholder values in a module bay tree "
|
||||
"{level} levels deep but {tokens} placeholders given.").format(
|
||||
level=len(positions), tokens=token_count
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def create_cablepaths(objects):
|
||||
"""
|
||||
Create CablePaths for all paths originating from the specified set of nodes.
|
||||
|
||||
@@ -1,19 +1,70 @@
|
||||
from django.utils.translation import gettext as _
|
||||
import logging
|
||||
|
||||
from django.core.files.storage import storages
|
||||
from django.db import IntegrityError
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
from drf_spectacular.utils import extend_schema_field
|
||||
from rest_framework import serializers
|
||||
|
||||
from core.api.serializers_.jobs import JobSerializer
|
||||
from extras.models import Script
|
||||
from core.choices import ManagedFileRootPathChoices
|
||||
from extras.models import Script, ScriptModule
|
||||
from netbox.api.serializers import ValidatedModelSerializer
|
||||
from utilities.datetime import local_now
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
__all__ = (
|
||||
'ScriptDetailSerializer',
|
||||
'ScriptInputSerializer',
|
||||
'ScriptModuleSerializer',
|
||||
'ScriptSerializer',
|
||||
)
|
||||
|
||||
|
||||
class ScriptModuleSerializer(ValidatedModelSerializer):
|
||||
file = serializers.FileField(write_only=True)
|
||||
file_path = serializers.CharField(read_only=True)
|
||||
|
||||
class Meta:
|
||||
model = ScriptModule
|
||||
fields = ['id', 'display', 'file_path', 'file', 'created', 'last_updated']
|
||||
brief_fields = ('id', 'display')
|
||||
|
||||
def validate(self, data):
|
||||
# ScriptModule.save() sets file_root; inject it here so full_clean() succeeds.
|
||||
# Pop 'file' before model instantiation — ScriptModule has no such field.
|
||||
file = data.pop('file', None)
|
||||
data['file_root'] = ManagedFileRootPathChoices.SCRIPTS
|
||||
data = super().validate(data)
|
||||
data.pop('file_root', None)
|
||||
if file is not None:
|
||||
data['file'] = file
|
||||
return data
|
||||
|
||||
def create(self, validated_data):
|
||||
file = validated_data.pop('file')
|
||||
storage = storages.create_storage(storages.backends["scripts"])
|
||||
validated_data['file_path'] = storage.save(file.name, file)
|
||||
created = False
|
||||
try:
|
||||
instance = super().create(validated_data)
|
||||
created = True
|
||||
return instance
|
||||
except IntegrityError as e:
|
||||
if 'file_path' in str(e):
|
||||
raise serializers.ValidationError(
|
||||
_("A script module with this file name already exists.")
|
||||
)
|
||||
raise
|
||||
finally:
|
||||
if not created and (file_path := validated_data.get('file_path')):
|
||||
try:
|
||||
storage.delete(file_path)
|
||||
except Exception:
|
||||
logger.warning(f"Failed to delete orphaned script file '{file_path}' from storage.")
|
||||
|
||||
|
||||
class ScriptSerializer(ValidatedModelSerializer):
|
||||
description = serializers.SerializerMethodField(read_only=True)
|
||||
vars = serializers.SerializerMethodField(read_only=True)
|
||||
|
||||
@@ -26,6 +26,7 @@ router.register('journal-entries', views.JournalEntryViewSet)
|
||||
router.register('config-contexts', views.ConfigContextViewSet)
|
||||
router.register('config-context-profiles', views.ConfigContextProfileViewSet)
|
||||
router.register('config-templates', views.ConfigTemplateViewSet)
|
||||
router.register('scripts/upload', views.ScriptModuleViewSet)
|
||||
router.register('scripts', views.ScriptViewSet, basename='script')
|
||||
|
||||
app_name = 'extras-api'
|
||||
|
||||
@@ -6,7 +6,7 @@ from rest_framework import status
|
||||
from rest_framework.decorators import action
|
||||
from rest_framework.exceptions import PermissionDenied
|
||||
from rest_framework.generics import RetrieveUpdateDestroyAPIView
|
||||
from rest_framework.mixins import ListModelMixin, RetrieveModelMixin
|
||||
from rest_framework.mixins import CreateModelMixin, ListModelMixin, RetrieveModelMixin
|
||||
from rest_framework.renderers import JSONRenderer
|
||||
from rest_framework.response import Response
|
||||
from rest_framework.routers import APIRootView
|
||||
@@ -21,6 +21,7 @@ from netbox.api.features import SyncedDataMixin
|
||||
from netbox.api.metadata import ContentTypeMetadata
|
||||
from netbox.api.renderers import TextRenderer
|
||||
from netbox.api.viewsets import BaseViewSet, NetBoxModelViewSet
|
||||
from netbox.api.viewsets.mixins import ObjectValidationMixin
|
||||
from utilities.exceptions import RQWorkerNotRunningException
|
||||
from utilities.request import copy_safe_request
|
||||
|
||||
@@ -264,6 +265,11 @@ class ConfigTemplateViewSet(SyncedDataMixin, ConfigTemplateRenderMixin, NetBoxMo
|
||||
# Scripts
|
||||
#
|
||||
|
||||
class ScriptModuleViewSet(ObjectValidationMixin, CreateModelMixin, BaseViewSet):
|
||||
queryset = ScriptModule.objects.all()
|
||||
serializer_class = serializers.ScriptModuleSerializer
|
||||
|
||||
|
||||
@extend_schema_view(
|
||||
update=extend_schema(request=serializers.ScriptInputSerializer),
|
||||
partial_update=extend_schema(request=serializers.ScriptInputSerializer),
|
||||
|
||||
@@ -25,16 +25,54 @@ logger = logging.getLogger('netbox.events_processor')
|
||||
|
||||
class EventContext(UserDict):
|
||||
"""
|
||||
A custom dictionary that automatically serializes its associated object on demand.
|
||||
Dictionary-compatible wrapper for queued events that lazily serializes
|
||||
``event['data']`` on first access.
|
||||
|
||||
Backward-compatible with the plain-dict interface expected by existing
|
||||
EVENTS_PIPELINE consumers. When the same object is enqueued more than once
|
||||
in a single request, the serialization source is updated so consumers see
|
||||
the latest state.
|
||||
"""
|
||||
|
||||
# We're emulating a dictionary here (rather than using a custom class) because prior to NetBox v4.5.2, events were
|
||||
# queued as dictionaries for processing by handles in EVENTS_PIPELINE. We need to avoid introducing any breaking
|
||||
# changes until a suitable minor release.
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
# Track which model instance should be serialized if/when `data` is
|
||||
# requested. This may be refreshed on duplicate enqueue, while leaving
|
||||
# the public `object` entry untouched for compatibility.
|
||||
self._serialization_source = None
|
||||
if 'object' in self:
|
||||
self._serialization_source = super().__getitem__('object')
|
||||
|
||||
def refresh_serialization_source(self, instance):
|
||||
"""
|
||||
Point lazy serialization at a fresher instance, invalidating any
|
||||
already-materialized ``data``.
|
||||
"""
|
||||
self._serialization_source = instance
|
||||
# UserDict.__contains__ checks the backing dict directly, so `in`
|
||||
# does not trigger __getitem__'s lazy serialization.
|
||||
if 'data' in self:
|
||||
del self['data']
|
||||
|
||||
def freeze_data(self, instance):
|
||||
"""
|
||||
Eagerly serialize and cache the payload for delete events, where the
|
||||
object may become inaccessible after deletion.
|
||||
"""
|
||||
super().__setitem__('data', serialize_for_event(instance))
|
||||
self._serialization_source = None
|
||||
|
||||
def __getitem__(self, item):
|
||||
if item == 'data' and 'data' not in self:
|
||||
data = serialize_for_event(self['object'])
|
||||
self.__setitem__('data', data)
|
||||
# Materialize the payload only when an event consumer asks for it.
|
||||
#
|
||||
# On coalesced events, use the latest explicitly queued instance so
|
||||
# webhooks/scripts/notifications observe the final queued state for
|
||||
# that object within the request.
|
||||
source = self._serialization_source or super().__getitem__('object')
|
||||
super().__setitem__('data', serialize_for_event(source))
|
||||
|
||||
return super().__getitem__(item)
|
||||
|
||||
|
||||
@@ -76,8 +114,9 @@ def get_snapshots(instance, event_type):
|
||||
|
||||
def enqueue_event(queue, instance, request, event_type):
|
||||
"""
|
||||
Enqueue a serialized representation of a created/updated/deleted object for the processing of
|
||||
events once the request has completed.
|
||||
Enqueue (or coalesce) an event for a created/updated/deleted object.
|
||||
|
||||
Events are processed after the request completes.
|
||||
"""
|
||||
# Bail if this type of object does not support event rules
|
||||
if not has_feature(instance, 'event_rules'):
|
||||
@@ -88,11 +127,18 @@ def enqueue_event(queue, instance, request, event_type):
|
||||
|
||||
assert instance.pk is not None
|
||||
key = f'{app_label}.{model_name}:{instance.pk}'
|
||||
|
||||
if key in queue:
|
||||
queue[key]['snapshots']['postchange'] = get_snapshots(instance, event_type)['postchange']
|
||||
# If the object is being deleted, update any prior "update" event to "delete"
|
||||
|
||||
# If the object is being deleted, convert any prior update event into a
|
||||
# delete event and freeze the payload before the object (or related
|
||||
# rows) become inaccessible.
|
||||
if event_type == OBJECT_DELETED:
|
||||
queue[key]['event_type'] = event_type
|
||||
else:
|
||||
# Keep the public `object` entry stable for compatibility.
|
||||
queue[key].refresh_serialization_source(instance)
|
||||
else:
|
||||
queue[key] = EventContext(
|
||||
object_type=ObjectType.objects.get_for_model(instance),
|
||||
@@ -106,9 +152,11 @@ def enqueue_event(queue, instance, request, event_type):
|
||||
username=request.user.username, # DEPRECATED, will be removed in NetBox v4.7.0
|
||||
request_id=request.id, # DEPRECATED, will be removed in NetBox v4.7.0
|
||||
)
|
||||
# Force serialization of objects prior to them actually being deleted
|
||||
|
||||
# For delete events, eagerly serialize the payload before the row is gone.
|
||||
# This covers both first-time enqueues and coalesced update→delete promotions.
|
||||
if event_type == OBJECT_DELETED:
|
||||
queue[key]['data'] = serialize_for_event(instance)
|
||||
queue[key].freeze_data(instance)
|
||||
|
||||
|
||||
def process_event_rules(event_rules, object_type, event):
|
||||
@@ -133,9 +181,9 @@ def process_event_rules(event_rules, object_type, event):
|
||||
if not event_rule.eval_conditions(event['data']):
|
||||
continue
|
||||
|
||||
# Compile event data
|
||||
event_data = event_rule.action_data or {}
|
||||
event_data.update(event['data'])
|
||||
# Merge rule-specific action_data with the event payload.
|
||||
# Copy to avoid mutating the rule's stored action_data dict.
|
||||
event_data = {**(event_rule.action_data or {}), **event['data']}
|
||||
|
||||
# Webhooks
|
||||
if event_rule.action_type == EventRuleActionChoices.WEBHOOK:
|
||||
|
||||
@@ -417,6 +417,7 @@ class NotificationTable(NetBoxTable):
|
||||
icon = columns.TemplateColumn(
|
||||
template_code=NOTIFICATION_ICON,
|
||||
accessor=tables.A('event'),
|
||||
orderable=False,
|
||||
attrs={
|
||||
'td': {'class': 'w-1'},
|
||||
'th': {'class': 'w-1'},
|
||||
@@ -479,8 +480,8 @@ class WebhookTable(NetBoxTable):
|
||||
verbose_name=_('Name'),
|
||||
linkify=True
|
||||
)
|
||||
ssl_validation = columns.BooleanColumn(
|
||||
verbose_name=_('SSL Validation')
|
||||
ssl_verification = columns.BooleanColumn(
|
||||
verbose_name=_('SSL Verification'),
|
||||
)
|
||||
owner = tables.Column(
|
||||
linkify=True,
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
import datetime
|
||||
import hashlib
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
from django.contrib.contenttypes.models import ContentType
|
||||
from django.core.files.uploadedfile import SimpleUploadedFile
|
||||
from django.urls import reverse
|
||||
from django.utils.timezone import make_aware, now
|
||||
from rest_framework import status
|
||||
@@ -1384,3 +1386,54 @@ class NotificationTest(APIViewTestCases.APIViewTestCase):
|
||||
'event_type': OBJECT_DELETED,
|
||||
},
|
||||
]
|
||||
|
||||
|
||||
class ScriptModuleTest(APITestCase):
|
||||
"""
|
||||
Tests for the POST /api/extras/scripts/upload/ endpoint.
|
||||
|
||||
ScriptModule is a proxy of core.ManagedFile (a different app) so the standard
|
||||
APIViewTestCases mixins cannot be used directly. All tests use add_permissions()
|
||||
with explicit Django model-level permissions.
|
||||
"""
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.url = reverse('extras-api:scriptmodule-list') # /api/extras/scripts/upload/
|
||||
|
||||
def test_upload_script_module_without_permission(self):
|
||||
script_content = b"from extras.scripts import Script\nclass TestScript(Script):\n pass\n"
|
||||
upload_file = SimpleUploadedFile('test_upload.py', script_content, content_type='text/plain')
|
||||
response = self.client.post(
|
||||
self.url,
|
||||
{'file': upload_file},
|
||||
format='multipart',
|
||||
**self.header,
|
||||
)
|
||||
self.assertHttpStatus(response, status.HTTP_403_FORBIDDEN)
|
||||
|
||||
def test_upload_script_module(self):
|
||||
# ScriptModule is a proxy of core.ManagedFile; both permissions required.
|
||||
self.add_permissions('extras.add_scriptmodule', 'core.add_managedfile')
|
||||
script_content = b"from extras.scripts import Script\nclass TestScript(Script):\n pass\n"
|
||||
upload_file = SimpleUploadedFile('test_upload.py', script_content, content_type='text/plain')
|
||||
mock_storage = MagicMock()
|
||||
mock_storage.save.return_value = 'test_upload.py'
|
||||
with patch('extras.api.serializers_.scripts.storages') as mock_storages:
|
||||
mock_storages.create_storage.return_value = mock_storage
|
||||
mock_storages.backends = {'scripts': {}}
|
||||
response = self.client.post(
|
||||
self.url,
|
||||
{'file': upload_file},
|
||||
format='multipart',
|
||||
**self.header,
|
||||
)
|
||||
self.assertHttpStatus(response, status.HTTP_201_CREATED)
|
||||
self.assertEqual(response.data['file_path'], 'test_upload.py')
|
||||
mock_storage.save.assert_called_once()
|
||||
self.assertTrue(ScriptModule.objects.filter(file_path='test_upload.py').exists())
|
||||
|
||||
def test_upload_script_module_without_file_fails(self):
|
||||
self.add_permissions('extras.add_scriptmodule', 'core.add_managedfile')
|
||||
response = self.client.post(self.url, {}, format='json', **self.header)
|
||||
self.assertHttpStatus(response, status.HTTP_400_BAD_REQUEST)
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
import json
|
||||
import uuid
|
||||
from unittest import skipIf
|
||||
from unittest.mock import Mock, patch
|
||||
|
||||
import django_rq
|
||||
from django.conf import settings
|
||||
from django.http import HttpResponse
|
||||
from django.test import RequestFactory
|
||||
from django.urls import reverse
|
||||
@@ -343,6 +345,7 @@ class EventRuleTest(APITestCase):
|
||||
self.assertEqual(job.kwargs['snapshots']['prechange']['name'], sites[i].name)
|
||||
self.assertEqual(job.kwargs['snapshots']['prechange']['tags'], ['Bar', 'Foo'])
|
||||
|
||||
@skipIf('netbox.tests.dummy_plugin' not in settings.PLUGINS, 'dummy_plugin not in settings.PLUGINS')
|
||||
def test_send_webhook(self):
|
||||
request_id = uuid.uuid4()
|
||||
|
||||
@@ -426,6 +429,97 @@ class EventRuleTest(APITestCase):
|
||||
self.assertEqual(job.kwargs['object_type'], script_type)
|
||||
self.assertEqual(job.kwargs['username'], self.user.username)
|
||||
|
||||
def test_duplicate_enqueue_refreshes_lazy_payload(self):
|
||||
"""
|
||||
When the same object is enqueued more than once in a single request,
|
||||
lazy serialization should use the most recently enqueued instance while
|
||||
preserving the original event['object'] reference.
|
||||
"""
|
||||
request = RequestFactory().get(reverse('dcim:site_add'))
|
||||
request.id = uuid.uuid4()
|
||||
request.user = self.user
|
||||
|
||||
site = Site.objects.create(name='Site 1', slug='site-1')
|
||||
stale_site = Site.objects.get(pk=site.pk)
|
||||
|
||||
queue = {}
|
||||
enqueue_event(queue, stale_site, request, OBJECT_UPDATED)
|
||||
|
||||
event = queue[f'dcim.site:{site.pk}']
|
||||
|
||||
# Data should not be materialized yet (lazy serialization)
|
||||
self.assertNotIn('data', event.data)
|
||||
|
||||
fresh_site = Site.objects.get(pk=site.pk)
|
||||
fresh_site.description = 'foo'
|
||||
fresh_site.save()
|
||||
|
||||
enqueue_event(queue, fresh_site, request, OBJECT_UPDATED)
|
||||
|
||||
# The original object reference should be preserved
|
||||
self.assertIs(event['object'], stale_site)
|
||||
|
||||
# But serialized data should reflect the fresher instance
|
||||
self.assertEqual(event['data']['description'], 'foo')
|
||||
self.assertEqual(event['snapshots']['postchange']['description'], 'foo')
|
||||
|
||||
def test_duplicate_enqueue_invalidates_materialized_data(self):
|
||||
"""
|
||||
If event['data'] has already been materialized before a second enqueue
|
||||
for the same object, the stale payload should be discarded and rebuilt
|
||||
from the fresher instance on next access.
|
||||
"""
|
||||
request = RequestFactory().get(reverse('dcim:site_add'))
|
||||
request.id = uuid.uuid4()
|
||||
request.user = self.user
|
||||
|
||||
site = Site.objects.create(name='Site 1', slug='site-1')
|
||||
|
||||
queue = {}
|
||||
enqueue_event(queue, site, request, OBJECT_UPDATED)
|
||||
|
||||
event = queue[f'dcim.site:{site.pk}']
|
||||
|
||||
# Force early materialization
|
||||
self.assertEqual(event['data']['description'], '')
|
||||
|
||||
# Now update and re-enqueue
|
||||
fresh_site = Site.objects.get(pk=site.pk)
|
||||
fresh_site.description = 'updated'
|
||||
fresh_site.save()
|
||||
|
||||
enqueue_event(queue, fresh_site, request, OBJECT_UPDATED)
|
||||
|
||||
# Stale data should have been invalidated; new access should reflect update
|
||||
self.assertEqual(event['data']['description'], 'updated')
|
||||
|
||||
def test_update_then_delete_enqueue_freezes_payload(self):
|
||||
"""
|
||||
When an update event is coalesced with a subsequent delete, the event
|
||||
type should be promoted to OBJECT_DELETED and the payload should be
|
||||
eagerly frozen (since the object will be inaccessible after deletion).
|
||||
"""
|
||||
request = RequestFactory().get(reverse('dcim:site_add'))
|
||||
request.id = uuid.uuid4()
|
||||
request.user = self.user
|
||||
|
||||
site = Site.objects.create(name='Site 1', slug='site-1')
|
||||
|
||||
queue = {}
|
||||
enqueue_event(queue, site, request, OBJECT_UPDATED)
|
||||
|
||||
event = queue[f'dcim.site:{site.pk}']
|
||||
|
||||
enqueue_event(queue, site, request, OBJECT_DELETED)
|
||||
|
||||
# Event type should have been promoted
|
||||
self.assertEqual(event['event_type'], OBJECT_DELETED)
|
||||
|
||||
# Data should already be materialized (frozen), not lazy
|
||||
self.assertIn('data', event.data)
|
||||
self.assertEqual(event['data']['name'], 'Site 1')
|
||||
self.assertIsNone(event['snapshots']['postchange'])
|
||||
|
||||
def test_duplicate_triggers(self):
|
||||
"""
|
||||
Test for erroneous duplicate event triggers resulting from saving an object multiple times
|
||||
|
||||
@@ -1,24 +1,84 @@
|
||||
from django.test import RequestFactory, TestCase, tag
|
||||
|
||||
from extras.models import EventRule
|
||||
from extras.tables import EventRuleTable
|
||||
from extras.models import Bookmark, Notification, Subscription
|
||||
from extras.tables import *
|
||||
from utilities.testing import TableTestCases
|
||||
|
||||
|
||||
@tag('regression')
|
||||
class EventRuleTableTest(TestCase):
|
||||
def test_every_orderable_field_does_not_throw_exception(self):
|
||||
rule = EventRule.objects.all()
|
||||
disallowed = {
|
||||
'actions',
|
||||
}
|
||||
class CustomFieldTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = CustomFieldTable
|
||||
|
||||
orderable_columns = [
|
||||
column.name for column in EventRuleTable(rule).columns if column.orderable and column.name not in disallowed
|
||||
]
|
||||
fake_request = RequestFactory().get('/')
|
||||
|
||||
for col in orderable_columns:
|
||||
for direction in ('-', ''):
|
||||
table = EventRuleTable(rule)
|
||||
table.order_by = f'{direction}{col}'
|
||||
table.as_html(fake_request)
|
||||
class CustomFieldChoiceSetTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = CustomFieldChoiceSetTable
|
||||
|
||||
|
||||
class CustomLinkTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = CustomLinkTable
|
||||
|
||||
|
||||
class ExportTemplateTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = ExportTemplateTable
|
||||
|
||||
|
||||
class SavedFilterTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = SavedFilterTable
|
||||
|
||||
|
||||
class TableConfigTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = TableConfigTable
|
||||
|
||||
|
||||
class BookmarkTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = BookmarkTable
|
||||
queryset_sources = [
|
||||
('BookmarkListView', Bookmark.objects.all()),
|
||||
]
|
||||
|
||||
|
||||
class NotificationGroupTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = NotificationGroupTable
|
||||
|
||||
|
||||
class NotificationTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = NotificationTable
|
||||
queryset_sources = [
|
||||
('NotificationListView', Notification.objects.all()),
|
||||
]
|
||||
|
||||
|
||||
class SubscriptionTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = SubscriptionTable
|
||||
queryset_sources = [
|
||||
('SubscriptionListView', Subscription.objects.all()),
|
||||
]
|
||||
|
||||
|
||||
class WebhookTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = WebhookTable
|
||||
|
||||
|
||||
class EventRuleTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = EventRuleTable
|
||||
|
||||
|
||||
class TagTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = TagTable
|
||||
|
||||
|
||||
class ConfigContextProfileTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = ConfigContextProfileTable
|
||||
|
||||
|
||||
class ConfigContextTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = ConfigContextTable
|
||||
|
||||
|
||||
class ConfigTemplateTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = ConfigTemplateTable
|
||||
|
||||
|
||||
class ImageAttachmentTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = ImageAttachmentTable
|
||||
|
||||
|
||||
class JournalEntryTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = JournalEntryTable
|
||||
|
||||
@@ -247,6 +247,6 @@ class VLANTranslationRuleTable(NetBoxTable):
|
||||
class Meta(NetBoxTable.Meta):
|
||||
model = VLANTranslationRule
|
||||
fields = (
|
||||
'pk', 'id', 'name', 'policy', 'local_vid', 'remote_vid', 'description', 'tags', 'created', 'last_updated',
|
||||
'pk', 'id', 'policy', 'local_vid', 'remote_vid', 'description', 'tags', 'created', 'last_updated',
|
||||
)
|
||||
default_columns = ('pk', 'policy', 'local_vid', 'remote_vid', 'description')
|
||||
|
||||
@@ -1,9 +1,10 @@
|
||||
from django.test import RequestFactory, TestCase
|
||||
from netaddr import IPNetwork
|
||||
|
||||
from ipam.models import IPAddress, IPRange, Prefix
|
||||
from ipam.tables import AnnotatedIPAddressTable
|
||||
from ipam.models import FHRPGroupAssignment, IPAddress, IPRange, Prefix
|
||||
from ipam.tables import *
|
||||
from ipam.utils import annotate_ip_space
|
||||
from utilities.testing import TableTestCases
|
||||
|
||||
|
||||
class AnnotatedIPAddressTableTest(TestCase):
|
||||
@@ -168,3 +169,82 @@ class AnnotatedIPAddressTableTest(TestCase):
|
||||
# Pools are fully usable
|
||||
self.assertEqual(available.first_ip, '2001:db8:1::/126')
|
||||
self.assertEqual(available.size, 4)
|
||||
|
||||
|
||||
#
|
||||
# Table ordering tests
|
||||
#
|
||||
|
||||
class VRFTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = VRFTable
|
||||
|
||||
|
||||
class RouteTargetTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = RouteTargetTable
|
||||
|
||||
|
||||
class RIRTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = RIRTable
|
||||
|
||||
|
||||
class AggregateTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = AggregateTable
|
||||
|
||||
|
||||
class RoleTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = RoleTable
|
||||
|
||||
|
||||
class PrefixTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = PrefixTable
|
||||
|
||||
|
||||
class IPRangeTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = IPRangeTable
|
||||
|
||||
|
||||
class IPAddressTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = IPAddressTable
|
||||
|
||||
|
||||
class FHRPGroupTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = FHRPGroupTable
|
||||
|
||||
|
||||
class FHRPGroupAssignmentTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = FHRPGroupAssignmentTable
|
||||
queryset_sources = [
|
||||
('FHRPGroupAssignmentTable', FHRPGroupAssignment.objects.all()),
|
||||
]
|
||||
|
||||
|
||||
class VLANGroupTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = VLANGroupTable
|
||||
|
||||
|
||||
class VLANTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = VLANTable
|
||||
|
||||
|
||||
class VLANTranslationPolicyTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = VLANTranslationPolicyTable
|
||||
|
||||
|
||||
class VLANTranslationRuleTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = VLANTranslationRuleTable
|
||||
|
||||
|
||||
class ASNRangeTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = ASNRangeTable
|
||||
|
||||
|
||||
class ASNTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = ASNTable
|
||||
|
||||
|
||||
class ServiceTemplateTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = ServiceTemplateTable
|
||||
|
||||
|
||||
class ServiceTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = ServiceTable
|
||||
|
||||
@@ -20,6 +20,10 @@ PLUGINS = [
|
||||
'netbox.tests.dummy_plugin',
|
||||
]
|
||||
|
||||
RQ = {
|
||||
'COMMIT_MODE': 'auto',
|
||||
}
|
||||
|
||||
REDIS = {
|
||||
'tasks': {
|
||||
'HOST': 'localhost',
|
||||
|
||||
@@ -168,6 +168,7 @@ REMOTE_AUTH_USER_FIRST_NAME = getattr(configuration, 'REMOTE_AUTH_USER_FIRST_NAM
|
||||
REMOTE_AUTH_USER_LAST_NAME = getattr(configuration, 'REMOTE_AUTH_USER_LAST_NAME', 'HTTP_REMOTE_USER_LAST_NAME')
|
||||
# Required by extras/migrations/0109_script_models.py
|
||||
REPORTS_ROOT = getattr(configuration, 'REPORTS_ROOT', os.path.join(BASE_DIR, 'reports')).rstrip('/')
|
||||
RQ = getattr(configuration, 'RQ', {})
|
||||
RQ_DEFAULT_TIMEOUT = getattr(configuration, 'RQ_DEFAULT_TIMEOUT', 300)
|
||||
RQ_RETRY_INTERVAL = getattr(configuration, 'RQ_RETRY_INTERVAL', 60)
|
||||
RQ_RETRY_MAX = getattr(configuration, 'RQ_RETRY_MAX', 0)
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
<th scope="row"><i class="mdi mdi-chip"></i> {% trans "Memory" %}</th>
|
||||
<td>
|
||||
{% if memory_sum %}
|
||||
<span title={{ memory_sum }}>{{ memory_sum|humanize_ram_megabytes }}</span>
|
||||
<span title={{ memory_sum }}>{{ memory_sum|humanize_ram_capacity }}</span>
|
||||
{% else %}
|
||||
{{ ''|placeholder }}
|
||||
{% endif %}
|
||||
@@ -24,7 +24,7 @@
|
||||
</th>
|
||||
<td>
|
||||
{% if disk_sum %}
|
||||
{{ disk_sum|humanize_disk_megabytes }}
|
||||
{{ disk_sum|humanize_disk_capacity }}
|
||||
{% else %}
|
||||
{{ ''|placeholder }}
|
||||
{% endif %}
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
<th scope="row"><i class="mdi mdi-chip"></i> {% trans "Memory" %}</th>
|
||||
<td>
|
||||
{% if object.memory %}
|
||||
<span title={{ object.memory }}>{{ object.memory|humanize_ram_megabytes }}</span>
|
||||
<span title={{ object.memory }}>{{ object.memory|humanize_ram_capacity }}</span>
|
||||
{% else %}
|
||||
{{ ''|placeholder }}
|
||||
{% endif %}
|
||||
@@ -24,7 +24,7 @@
|
||||
</th>
|
||||
<td>
|
||||
{% if object.disk %}
|
||||
{{ object.disk|humanize_disk_megabytes }}
|
||||
{{ object.disk|humanize_disk_capacity }}
|
||||
{% else %}
|
||||
{{ ''|placeholder }}
|
||||
{% endif %}
|
||||
|
||||
@@ -1,2 +1,2 @@
|
||||
{% load helpers %}
|
||||
{{ value|humanize_disk_megabytes }}
|
||||
{{ value|humanize_disk_capacity }}
|
||||
|
||||
26
netbox/tenancy/tests/test_tables.py
Normal file
26
netbox/tenancy/tests/test_tables.py
Normal file
@@ -0,0 +1,26 @@
|
||||
from tenancy.tables import *
|
||||
from utilities.testing import TableTestCases
|
||||
|
||||
|
||||
class TenantGroupTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = TenantGroupTable
|
||||
|
||||
|
||||
class TenantTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = TenantTable
|
||||
|
||||
|
||||
class ContactGroupTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = ContactGroupTable
|
||||
|
||||
|
||||
class ContactRoleTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = ContactRoleTable
|
||||
|
||||
|
||||
class ContactTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = ContactTable
|
||||
|
||||
|
||||
class ContactAssignmentTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = ContactAssignmentTable
|
||||
File diff suppressed because it is too large
Load Diff
@@ -1,24 +1,26 @@
|
||||
from django.test import RequestFactory, TestCase, tag
|
||||
|
||||
from users.models import Token
|
||||
from users.tables import TokenTable
|
||||
from users.tables import *
|
||||
from utilities.testing import TableTestCases
|
||||
|
||||
|
||||
class TokenTableTest(TestCase):
|
||||
@tag('regression')
|
||||
def test_every_orderable_field_does_not_throw_exception(self):
|
||||
tokens = Token.objects.all()
|
||||
disallowed = {'actions'}
|
||||
class TokenTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = TokenTable
|
||||
|
||||
orderable_columns = [
|
||||
column.name for column in TokenTable(tokens).columns
|
||||
if column.orderable and column.name not in disallowed
|
||||
]
|
||||
fake_request = RequestFactory().get("/")
|
||||
|
||||
for col in orderable_columns:
|
||||
for direction in ('-', ''):
|
||||
with self.subTest(col=col, direction=direction):
|
||||
table = TokenTable(tokens)
|
||||
table.order_by = f'{direction}{col}'
|
||||
table.as_html(fake_request)
|
||||
class UserTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = UserTable
|
||||
|
||||
|
||||
class GroupTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = GroupTable
|
||||
|
||||
|
||||
class ObjectPermissionTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = ObjectPermissionTable
|
||||
|
||||
|
||||
class OwnerGroupTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = OwnerGroupTable
|
||||
|
||||
|
||||
class OwnerTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = OwnerTable
|
||||
|
||||
@@ -14,6 +14,7 @@ __all__ = (
|
||||
'expand_alphanumeric_pattern',
|
||||
'expand_ipaddress_pattern',
|
||||
'form_from_model',
|
||||
'get_capacity_unit_label',
|
||||
'get_field_value',
|
||||
'get_selected_values',
|
||||
'parse_alphanumeric_range',
|
||||
@@ -130,6 +131,13 @@ def expand_ipaddress_pattern(string, family):
|
||||
yield ''.join([lead, format(i, 'x' if family == 6 else 'd'), remnant])
|
||||
|
||||
|
||||
def get_capacity_unit_label(divisor=1000):
|
||||
"""
|
||||
Return the appropriate base unit label: 'MiB' for binary (1024), 'MB' for decimal (1000).
|
||||
"""
|
||||
return 'MiB' if divisor == 1024 else 'MB'
|
||||
|
||||
|
||||
def get_field_value(form, field_name):
|
||||
"""
|
||||
Return the current bound or initial value associated with a form field, prior to calling
|
||||
|
||||
@@ -48,11 +48,13 @@ class FilterModifierWidget(forms.Widget):
|
||||
Just the value string for form validation. The modifier is reconstructed
|
||||
during rendering from the query parameter names.
|
||||
"""
|
||||
# Special handling for empty - check if field__empty exists
|
||||
# Special handling for empty modifier: return None so the underlying field does not
|
||||
# attempt to validate 'true'/'false' as a field value (e.g. a model PK). The
|
||||
# `__empty` query parameter is consumed directly by the filterset and by
|
||||
# `applied_filters`, so no value from the field itself is needed here.
|
||||
empty_param = f"{name}__empty"
|
||||
if empty_param in data:
|
||||
# Return the boolean value for empty lookup
|
||||
return data.get(empty_param)
|
||||
return None
|
||||
|
||||
# Try exact field name first
|
||||
value = self.original_widget.value_from_datadict(data, files, name)
|
||||
@@ -113,8 +115,13 @@ class FilterModifierWidget(forms.Widget):
|
||||
# Build a minimal choice list with just the selected values
|
||||
choices = []
|
||||
if pk_values:
|
||||
selected_objects = original_choices.queryset.filter(pk__in=pk_values)
|
||||
choices = [(obj.pk, str(obj)) for obj in selected_objects]
|
||||
try:
|
||||
selected_objects = original_choices.queryset.filter(pk__in=pk_values)
|
||||
choices = [(obj.pk, str(obj)) for obj in selected_objects]
|
||||
except (ValueError, TypeError):
|
||||
# pk_values may contain non-PK strings (e.g. 'true'/'false' from the
|
||||
# empty modifier); silently skip rendering selected choices in that case.
|
||||
pass
|
||||
|
||||
# Re-add the "None" option if it was selected via the null choice value
|
||||
if settings.FILTERS_NULL_CHOICE_VALUE in values:
|
||||
|
||||
@@ -20,8 +20,8 @@ __all__ = (
|
||||
'divide',
|
||||
'get_item',
|
||||
'get_key',
|
||||
'humanize_disk_megabytes',
|
||||
'humanize_ram_megabytes',
|
||||
'humanize_disk_capacity',
|
||||
'humanize_ram_capacity',
|
||||
'humanize_speed',
|
||||
'icon_from_status',
|
||||
'kg_to_pounds',
|
||||
@@ -208,42 +208,52 @@ def humanize_speed(speed):
|
||||
return '{} Kbps'.format(speed)
|
||||
|
||||
|
||||
def _humanize_megabytes(mb, divisor=1000):
|
||||
def _humanize_capacity(value, divisor=1000):
|
||||
"""
|
||||
Express a number of megabytes in the most suitable unit (e.g. gigabytes, terabytes, etc.).
|
||||
Express a capacity value in the most suitable unit (e.g. GB, TiB, etc.).
|
||||
|
||||
The value is treated as a unitless base-unit quantity; the divisor determines
|
||||
both the scaling thresholds and the label convention:
|
||||
- 1000: SI labels (MB, GB, TB, PB)
|
||||
- 1024: IEC labels (MiB, GiB, TiB, PiB)
|
||||
"""
|
||||
if not mb:
|
||||
if not value:
|
||||
return ""
|
||||
|
||||
if divisor == 1024:
|
||||
labels = ('MiB', 'GiB', 'TiB', 'PiB')
|
||||
else:
|
||||
labels = ('MB', 'GB', 'TB', 'PB')
|
||||
|
||||
PB_SIZE = divisor**3
|
||||
TB_SIZE = divisor**2
|
||||
GB_SIZE = divisor
|
||||
|
||||
if mb >= PB_SIZE:
|
||||
return f"{mb / PB_SIZE:.2f} PB"
|
||||
if mb >= TB_SIZE:
|
||||
return f"{mb / TB_SIZE:.2f} TB"
|
||||
if mb >= GB_SIZE:
|
||||
return f"{mb / GB_SIZE:.2f} GB"
|
||||
return f"{mb} MB"
|
||||
if value >= PB_SIZE:
|
||||
return f"{value / PB_SIZE:.2f} {labels[3]}"
|
||||
if value >= TB_SIZE:
|
||||
return f"{value / TB_SIZE:.2f} {labels[2]}"
|
||||
if value >= GB_SIZE:
|
||||
return f"{value / GB_SIZE:.2f} {labels[1]}"
|
||||
return f"{value} {labels[0]}"
|
||||
|
||||
|
||||
@register.filter()
|
||||
def humanize_disk_megabytes(mb):
|
||||
def humanize_disk_capacity(value):
|
||||
"""
|
||||
Express a number of megabytes in the most suitable unit (e.g. gigabytes, terabytes, etc.).
|
||||
Use the DISK_BASE_UNIT setting to determine the divisor. Default is 1000.
|
||||
Express a disk capacity in the most suitable unit, using the DISK_BASE_UNIT
|
||||
setting to select SI (MB/GB) or IEC (MiB/GiB) labels.
|
||||
"""
|
||||
return _humanize_megabytes(mb, DISK_BASE_UNIT)
|
||||
return _humanize_capacity(value, DISK_BASE_UNIT)
|
||||
|
||||
|
||||
@register.filter()
|
||||
def humanize_ram_megabytes(mb):
|
||||
def humanize_ram_capacity(value):
|
||||
"""
|
||||
Express a number of megabytes in the most suitable unit (e.g. gigabytes, terabytes, etc.).
|
||||
Use the RAM_BASE_UNIT setting to determine the divisor. Default is 1000.
|
||||
Express a RAM capacity in the most suitable unit, using the RAM_BASE_UNIT
|
||||
setting to select SI (MB/GB) or IEC (MiB/GiB) labels.
|
||||
"""
|
||||
return _humanize_megabytes(mb, RAM_BASE_UNIT)
|
||||
return _humanize_capacity(value, RAM_BASE_UNIT)
|
||||
|
||||
|
||||
@register.filter()
|
||||
@@ -481,6 +491,35 @@ def applied_filters(context, model, form, query_params):
|
||||
'link_text': link_text,
|
||||
})
|
||||
|
||||
# Handle empty modifier pills separately. `FilterModifierWidget.value_from_datadict()`
|
||||
# returns None for fields with a `field__empty` query parameter so that the underlying
|
||||
# form field does not attempt to validate 'true'/'false' as a real field value (which
|
||||
# would raise a ValidationError for ModelChoiceField). Because the value is None, these
|
||||
# fields never appear in `form.changed_data`, so we build their pills directly from the
|
||||
# query parameters here.
|
||||
for param_name, param_value in query_params.items():
|
||||
if not param_name.endswith('__empty'):
|
||||
continue
|
||||
field_name = param_name[:-len('__empty')]
|
||||
if field_name not in form.fields or field_name == 'filter_id':
|
||||
continue
|
||||
|
||||
querydict = query_params.copy()
|
||||
querydict.pop(param_name)
|
||||
label = form.fields[field_name].label or field_name
|
||||
|
||||
if param_value.lower() in ('true', '1'):
|
||||
link_text = f'{label} {_("is empty")}'
|
||||
else:
|
||||
link_text = f'{label} {_("is not empty")}'
|
||||
|
||||
applied_filters.append({
|
||||
'name': param_name,
|
||||
'value': param_value,
|
||||
'link_url': f'?{querydict.urlencode()}',
|
||||
'link_text': link_text,
|
||||
})
|
||||
|
||||
save_link = None
|
||||
if user.has_perm('extras.add_savedfilter') and 'filter_id' not in context['request'].GET:
|
||||
object_type = ObjectType.objects.get_for_model(model).pk
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
from .api import *
|
||||
from .base import *
|
||||
from .filtersets import *
|
||||
from .tables import *
|
||||
from .utils import *
|
||||
from .views import *
|
||||
|
||||
130
netbox/utilities/testing/tables.py
Normal file
130
netbox/utilities/testing/tables.py
Normal file
@@ -0,0 +1,130 @@
|
||||
import inspect
|
||||
from importlib import import_module
|
||||
|
||||
from django.test import RequestFactory
|
||||
|
||||
from netbox.views import generic
|
||||
|
||||
from .base import TestCase
|
||||
|
||||
__all__ = (
|
||||
"ModelTableTestCase",
|
||||
"TableTestCases",
|
||||
)
|
||||
|
||||
|
||||
class ModelTableTestCase(TestCase):
|
||||
"""
|
||||
Shared helpers for model-backed table tests.
|
||||
|
||||
Concrete subclasses should set `table` and may override `get_queryset()`
|
||||
or `excluded_orderable_columns` as needed.
|
||||
"""
|
||||
table = None
|
||||
excluded_orderable_columns = frozenset({"actions"})
|
||||
|
||||
# Optional explicit override for odd cases
|
||||
queryset_sources = None
|
||||
|
||||
# Only these view types are considered sortable queryset sources by default
|
||||
queryset_source_view_classes = (generic.ObjectListView,)
|
||||
|
||||
@classmethod
|
||||
def validate_table_test_case(cls):
|
||||
if cls.table is None:
|
||||
raise AssertionError(f"{cls.__name__} must define `table`")
|
||||
if getattr(cls.table._meta, "model", None) is None:
|
||||
raise AssertionError(f"{cls.__name__}.table must be model-backed")
|
||||
|
||||
def get_request(self):
|
||||
request = RequestFactory().get("/")
|
||||
request.user = self.user
|
||||
return request
|
||||
|
||||
def get_table(self, queryset):
|
||||
return self.table(queryset)
|
||||
|
||||
@classmethod
|
||||
def is_queryset_source_view(cls, view):
|
||||
model = cls.table._meta.model
|
||||
app_label = model._meta.app_label
|
||||
|
||||
return (
|
||||
inspect.isclass(view)
|
||||
and view.__module__.startswith(f"{app_label}.views")
|
||||
and getattr(view, "table", None) is cls.table
|
||||
and getattr(view, "queryset", None) is not None
|
||||
and issubclass(view, cls.queryset_source_view_classes)
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def get_queryset_sources(cls):
|
||||
"""
|
||||
Return iterable of (label, queryset) pairs to test.
|
||||
|
||||
By default, only discover list-style views that declare this table.
|
||||
That keeps bulk edit/delete confirmation tables out of the ordering
|
||||
smoke test.
|
||||
"""
|
||||
if cls.queryset_sources is not None:
|
||||
return tuple(cls.queryset_sources)
|
||||
|
||||
model = cls.table._meta.model
|
||||
app_label = model._meta.app_label
|
||||
module = import_module(f"{app_label}.views")
|
||||
|
||||
sources = []
|
||||
for _, view in inspect.getmembers(module, inspect.isclass):
|
||||
if not cls.is_queryset_source_view(view):
|
||||
continue
|
||||
|
||||
queryset = view.queryset
|
||||
if hasattr(queryset, "all"):
|
||||
queryset = queryset.all()
|
||||
|
||||
sources.append((view.__name__, queryset))
|
||||
|
||||
if not sources:
|
||||
raise AssertionError(
|
||||
f"{cls.__name__} could not find any list-style queryset source for "
|
||||
f"{cls.table.__module__}.{cls.table.__name__}; "
|
||||
"set `queryset_sources` explicitly if needed."
|
||||
)
|
||||
|
||||
return tuple(sources)
|
||||
|
||||
def iter_orderable_columns(self, queryset):
|
||||
for column in self.get_table(queryset).columns:
|
||||
if not column.orderable:
|
||||
continue
|
||||
if column.name in self.excluded_orderable_columns:
|
||||
continue
|
||||
yield column.name
|
||||
|
||||
|
||||
class TableTestCases:
|
||||
"""
|
||||
Keep test_* methods nested to avoid unittest auto-discovering the reusable
|
||||
base classes directly.
|
||||
"""
|
||||
|
||||
class OrderableColumnsTestCase(ModelTableTestCase):
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
super().setUpClass()
|
||||
cls.validate_table_test_case()
|
||||
|
||||
def test_every_orderable_column_renders(self):
|
||||
request = self.get_request()
|
||||
|
||||
for source_name, queryset in self.get_queryset_sources():
|
||||
for column_name in self.iter_orderable_columns(queryset):
|
||||
for direction, prefix in (("asc", ""), ("desc", "-")):
|
||||
with self.cleanupSubTest(
|
||||
source=source_name,
|
||||
column=column_name,
|
||||
direction=direction,
|
||||
):
|
||||
table = self.get_table(queryset)
|
||||
table.order_by = f"{prefix}{column_name}"
|
||||
table.as_html(request)
|
||||
@@ -6,8 +6,11 @@ from django.template import Context
|
||||
from django.test import RequestFactory, TestCase
|
||||
|
||||
import dcim.filtersets # noqa: F401 - Import to register Device filterset
|
||||
from dcim.forms.filtersets import DeviceFilterForm
|
||||
from dcim.models import Device
|
||||
from core.models import ObjectType
|
||||
from dcim.forms.filtersets import DeviceFilterForm, SiteFilterForm
|
||||
from dcim.models import Device, Manufacturer, Site
|
||||
from extras.choices import CustomFieldTypeChoices
|
||||
from extras.models import CustomField
|
||||
from netbox.filtersets import BaseFilterSet
|
||||
from tenancy.models import Tenant
|
||||
from users.models import User
|
||||
@@ -338,3 +341,70 @@ class EmptyLookupTest(TestCase):
|
||||
self.assertGreater(len(result['applied_filters']), 0)
|
||||
filter_pill = result['applied_filters'][0]
|
||||
self.assertIn('not empty', filter_pill['link_text'].lower())
|
||||
|
||||
|
||||
class ObjectCustomFieldEmptyLookupTest(TestCase):
|
||||
"""
|
||||
Regression test for https://github.com/netbox-community/netbox/issues/21535.
|
||||
|
||||
Rendering a filter form with an object-type custom field and the __empty modifier
|
||||
must not raise a ValueError or produce a form validation error.
|
||||
Filter pills must still appear for the empty modifier.
|
||||
"""
|
||||
|
||||
@classmethod
|
||||
def setUpTestData(cls):
|
||||
cls.user = User.objects.create(username='test_user_obj_cf')
|
||||
site_type = ObjectType.objects.get_for_model(Site)
|
||||
cf = CustomField(
|
||||
name='test_obj_cf',
|
||||
type=CustomFieldTypeChoices.TYPE_OBJECT,
|
||||
related_object_type=ObjectType.objects.get_for_model(Manufacturer),
|
||||
)
|
||||
cf.save()
|
||||
cf.object_types.set([site_type])
|
||||
|
||||
def _make_form_and_result(self, querystring):
|
||||
query_params = QueryDict(querystring)
|
||||
form = SiteFilterForm(query_params)
|
||||
request = RequestFactory().get('/', query_params)
|
||||
request.user = self.user
|
||||
context = Context({'request': request})
|
||||
result = applied_filters(context, Site, form, query_params)
|
||||
return form, result
|
||||
|
||||
def test_render_form_with_empty_true_no_error(self):
|
||||
"""Rendering SiteFilterForm with cf__empty=true must not raise ValueError."""
|
||||
query_params = QueryDict('cf_test_obj_cf__empty=true')
|
||||
form = SiteFilterForm(query_params)
|
||||
try:
|
||||
str(form['cf_test_obj_cf'])
|
||||
except ValueError as e:
|
||||
self.fail(f"Rendering object-type custom field with __empty=true raised ValueError: {e}")
|
||||
|
||||
def test_render_form_with_empty_false_no_error(self):
|
||||
"""Rendering SiteFilterForm with cf__empty=false must not raise ValueError."""
|
||||
query_params = QueryDict('cf_test_obj_cf__empty=false')
|
||||
form = SiteFilterForm(query_params)
|
||||
try:
|
||||
str(form['cf_test_obj_cf'])
|
||||
except ValueError as e:
|
||||
self.fail(f"Rendering object-type custom field with __empty=false raised ValueError: {e}")
|
||||
|
||||
def test_no_validation_error_on_empty_true(self):
|
||||
"""The filter form must not have a validation error for the field when __empty=true."""
|
||||
form, _ = self._make_form_and_result('cf_test_obj_cf__empty=true')
|
||||
form.is_valid()
|
||||
self.assertNotIn('cf_test_obj_cf', form.errors)
|
||||
|
||||
def test_filter_pill_appears_for_empty_true(self):
|
||||
"""A filter pill showing 'is empty' must be generated for an object-type CF with __empty=true."""
|
||||
_, result = self._make_form_and_result('cf_test_obj_cf__empty=true')
|
||||
self.assertGreater(len(result['applied_filters']), 0)
|
||||
self.assertIn('empty', result['applied_filters'][0]['link_text'].lower())
|
||||
|
||||
def test_filter_pill_appears_for_empty_false(self):
|
||||
"""A filter pill showing 'is not empty' must be generated for an object-type CF with __empty=false."""
|
||||
_, result = self._make_form_and_result('cf_test_obj_cf__empty=false')
|
||||
self.assertGreater(len(result['applied_filters']), 0)
|
||||
self.assertIn('not empty', result['applied_filters'][0]['link_text'].lower())
|
||||
|
||||
@@ -6,7 +6,12 @@ from netbox.choices import ImportFormatChoices
|
||||
from utilities.forms.bulk_import import BulkImportForm
|
||||
from utilities.forms.fields.csv import CSVSelectWidget
|
||||
from utilities.forms.forms import BulkRenameForm
|
||||
from utilities.forms.utils import expand_alphanumeric_pattern, expand_ipaddress_pattern, get_field_value
|
||||
from utilities.forms.utils import (
|
||||
expand_alphanumeric_pattern,
|
||||
expand_ipaddress_pattern,
|
||||
get_capacity_unit_label,
|
||||
get_field_value,
|
||||
)
|
||||
from utilities.forms.widgets.select import AvailableOptions, SelectedOptions
|
||||
|
||||
|
||||
@@ -550,3 +555,15 @@ class SelectMultipleWidgetTest(TestCase):
|
||||
self.assertEqual(widget.choices[0][1], [(2, 'Option 2')])
|
||||
self.assertEqual(widget.choices[1][0], 'Group B')
|
||||
self.assertEqual(widget.choices[1][1], [(3, 'Option 3')])
|
||||
|
||||
|
||||
class GetCapacityUnitLabelTest(TestCase):
|
||||
"""
|
||||
Test the get_capacity_unit_label function for correct base unit label.
|
||||
"""
|
||||
|
||||
def test_si_label(self):
|
||||
self.assertEqual(get_capacity_unit_label(1000), 'MB')
|
||||
|
||||
def test_iec_label(self):
|
||||
self.assertEqual(get_capacity_unit_label(1024), 'MiB')
|
||||
|
||||
@@ -3,6 +3,7 @@ from unittest.mock import patch
|
||||
from django.test import TestCase, override_settings
|
||||
|
||||
from utilities.templatetags.builtins.tags import static_with_params
|
||||
from utilities.templatetags.helpers import _humanize_capacity
|
||||
|
||||
|
||||
class StaticWithParamsTest(TestCase):
|
||||
@@ -46,3 +47,46 @@ class StaticWithParamsTest(TestCase):
|
||||
# Check that new parameter value is used
|
||||
self.assertIn('v=new_version', result)
|
||||
self.assertNotIn('v=old_version', result)
|
||||
|
||||
|
||||
class HumanizeCapacityTest(TestCase):
|
||||
"""
|
||||
Test the _humanize_capacity function for correct SI/IEC unit label selection.
|
||||
"""
|
||||
|
||||
# Tests with divisor=1000 (SI/decimal units)
|
||||
|
||||
def test_si_megabytes(self):
|
||||
self.assertEqual(_humanize_capacity(500, divisor=1000), '500 MB')
|
||||
|
||||
def test_si_gigabytes(self):
|
||||
self.assertEqual(_humanize_capacity(2000, divisor=1000), '2.00 GB')
|
||||
|
||||
def test_si_terabytes(self):
|
||||
self.assertEqual(_humanize_capacity(2000000, divisor=1000), '2.00 TB')
|
||||
|
||||
def test_si_petabytes(self):
|
||||
self.assertEqual(_humanize_capacity(2000000000, divisor=1000), '2.00 PB')
|
||||
|
||||
# Tests with divisor=1024 (IEC/binary units)
|
||||
|
||||
def test_iec_megabytes(self):
|
||||
self.assertEqual(_humanize_capacity(500, divisor=1024), '500 MiB')
|
||||
|
||||
def test_iec_gigabytes(self):
|
||||
self.assertEqual(_humanize_capacity(2048, divisor=1024), '2.00 GiB')
|
||||
|
||||
def test_iec_terabytes(self):
|
||||
self.assertEqual(_humanize_capacity(2097152, divisor=1024), '2.00 TiB')
|
||||
|
||||
def test_iec_petabytes(self):
|
||||
self.assertEqual(_humanize_capacity(2147483648, divisor=1024), '2.00 PiB')
|
||||
|
||||
# Edge cases
|
||||
|
||||
def test_empty_value(self):
|
||||
self.assertEqual(_humanize_capacity(0, divisor=1000), '')
|
||||
self.assertEqual(_humanize_capacity(None, divisor=1000), '')
|
||||
|
||||
def test_default_divisor_is_1000(self):
|
||||
self.assertEqual(_humanize_capacity(2000), '2.00 GB')
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
from django import forms
|
||||
from django.conf import settings
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
|
||||
from dcim.choices import InterfaceModeChoices
|
||||
@@ -13,6 +14,7 @@ from tenancy.models import Tenant
|
||||
from utilities.forms import BulkRenameForm, add_blank_choice
|
||||
from utilities.forms.fields import DynamicModelChoiceField, DynamicModelMultipleChoiceField
|
||||
from utilities.forms.rendering import FieldSet
|
||||
from utilities.forms.utils import get_capacity_unit_label
|
||||
from utilities.forms.widgets import BulkEditNullBooleanSelect
|
||||
from virtualization.choices import *
|
||||
from virtualization.models import *
|
||||
@@ -138,11 +140,11 @@ class VirtualMachineBulkEditForm(PrimaryModelBulkEditForm):
|
||||
)
|
||||
memory = forms.IntegerField(
|
||||
required=False,
|
||||
label=_('Memory (MB)')
|
||||
label=_('Memory')
|
||||
)
|
||||
disk = forms.IntegerField(
|
||||
required=False,
|
||||
label=_('Disk (MB)')
|
||||
label=_('Disk')
|
||||
)
|
||||
config_template = DynamicModelChoiceField(
|
||||
queryset=ConfigTemplate.objects.all(),
|
||||
@@ -159,6 +161,13 @@ class VirtualMachineBulkEditForm(PrimaryModelBulkEditForm):
|
||||
'site', 'cluster', 'device', 'role', 'tenant', 'platform', 'vcpus', 'memory', 'disk', 'description', 'comments',
|
||||
)
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
# Set unit labels based on configured RAM_BASE_UNIT / DISK_BASE_UNIT (MB vs MiB)
|
||||
self.fields['memory'].label = _('Memory ({unit})').format(unit=get_capacity_unit_label(settings.RAM_BASE_UNIT))
|
||||
self.fields['disk'].label = _('Disk ({unit})').format(unit=get_capacity_unit_label(settings.DISK_BASE_UNIT))
|
||||
|
||||
|
||||
class VMInterfaceBulkEditForm(OwnerMixin, NetBoxModelBulkEditForm):
|
||||
virtual_machine = forms.ModelChoiceField(
|
||||
@@ -304,7 +313,7 @@ class VirtualDiskBulkEditForm(OwnerMixin, NetBoxModelBulkEditForm):
|
||||
)
|
||||
size = forms.IntegerField(
|
||||
required=False,
|
||||
label=_('Size (MB)')
|
||||
label=_('Size')
|
||||
)
|
||||
description = forms.CharField(
|
||||
label=_('Description'),
|
||||
@@ -318,6 +327,12 @@ class VirtualDiskBulkEditForm(OwnerMixin, NetBoxModelBulkEditForm):
|
||||
)
|
||||
nullable_fields = ('description',)
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
# Set unit label based on configured DISK_BASE_UNIT (MB vs MiB)
|
||||
self.fields['size'].label = _('Size ({unit})').format(unit=get_capacity_unit_label(settings.DISK_BASE_UNIT))
|
||||
|
||||
|
||||
class VirtualDiskBulkRenameForm(BulkRenameForm):
|
||||
pk = forms.ModelMultipleChoiceField(
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
from django import forms
|
||||
from django.conf import settings
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
|
||||
from dcim.choices import *
|
||||
@@ -12,6 +13,7 @@ from tenancy.forms import ContactModelFilterForm, TenancyFilterForm
|
||||
from utilities.forms import BOOLEAN_WITH_BLANK_CHOICES
|
||||
from utilities.forms.fields import DynamicModelMultipleChoiceField, TagFilterField
|
||||
from utilities.forms.rendering import FieldSet
|
||||
from utilities.forms.utils import get_capacity_unit_label
|
||||
from virtualization.choices import *
|
||||
from virtualization.models import *
|
||||
from vpn.models import L2VPN
|
||||
@@ -281,8 +283,14 @@ class VirtualDiskFilterForm(OwnerFilterMixin, NetBoxModelFilterSetForm):
|
||||
label=_('Virtual machine')
|
||||
)
|
||||
size = forms.IntegerField(
|
||||
label=_('Size (MB)'),
|
||||
label=_('Size'),
|
||||
required=False,
|
||||
min_value=1
|
||||
)
|
||||
tag = TagFilterField(model)
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
# Set unit label based on configured DISK_BASE_UNIT (MB vs MiB)
|
||||
self.fields['size'].label = _('Size ({unit})').format(unit=get_capacity_unit_label(settings.DISK_BASE_UNIT))
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
from django import forms
|
||||
from django.apps import apps
|
||||
from django.conf import settings
|
||||
from django.contrib.contenttypes.models import ContentType
|
||||
from django.core.exceptions import ValidationError
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
@@ -16,6 +17,7 @@ from tenancy.forms import TenancyForm
|
||||
from utilities.forms import ConfirmationForm
|
||||
from utilities.forms.fields import DynamicModelChoiceField, DynamicModelMultipleChoiceField, JSONField
|
||||
from utilities.forms.rendering import FieldSet
|
||||
from utilities.forms.utils import get_capacity_unit_label
|
||||
from utilities.forms.widgets import HTMXSelect
|
||||
from virtualization.models import *
|
||||
|
||||
@@ -236,6 +238,10 @@ class VirtualMachineForm(TenancyForm, PrimaryModelForm):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
# Set unit labels based on configured RAM_BASE_UNIT / DISK_BASE_UNIT (MB vs MiB)
|
||||
self.fields['memory'].label = _('Memory ({unit})').format(unit=get_capacity_unit_label(settings.RAM_BASE_UNIT))
|
||||
self.fields['disk'].label = _('Disk ({unit})').format(unit=get_capacity_unit_label(settings.DISK_BASE_UNIT))
|
||||
|
||||
if self.instance.pk:
|
||||
|
||||
# Disable the disk field if one or more VirtualDisks have been created
|
||||
@@ -401,3 +407,9 @@ class VirtualDiskForm(VMComponentForm):
|
||||
fields = [
|
||||
'virtual_machine', 'name', 'size', 'description', 'owner', 'tags',
|
||||
]
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
# Set unit label based on configured DISK_BASE_UNIT (MB vs MiB)
|
||||
self.fields['size'].label = _('Size ({unit})').format(unit=get_capacity_unit_label(settings.DISK_BASE_UNIT))
|
||||
|
||||
@@ -121,12 +121,12 @@ class VirtualMachine(ContactsMixin, ImageAttachmentsMixin, RenderConfigMixin, Co
|
||||
memory = models.PositiveIntegerField(
|
||||
blank=True,
|
||||
null=True,
|
||||
verbose_name=_('memory (MB)')
|
||||
verbose_name=_('memory')
|
||||
)
|
||||
disk = models.PositiveIntegerField(
|
||||
blank=True,
|
||||
null=True,
|
||||
verbose_name=_('disk (MB)')
|
||||
verbose_name=_('disk')
|
||||
)
|
||||
serial = models.CharField(
|
||||
verbose_name=_('serial number'),
|
||||
@@ -425,7 +425,7 @@ class VMInterface(ComponentModel, BaseInterface, TrackingModelMixin):
|
||||
|
||||
class VirtualDisk(ComponentModel, TrackingModelMixin):
|
||||
size = models.PositiveIntegerField(
|
||||
verbose_name=_('size (MB)'),
|
||||
verbose_name=_('size'),
|
||||
)
|
||||
|
||||
class Meta(ComponentModel.Meta):
|
||||
|
||||
@@ -4,7 +4,7 @@ from django.utils.translation import gettext_lazy as _
|
||||
from dcim.tables.devices import BaseInterfaceTable
|
||||
from netbox.tables import NetBoxTable, PrimaryModelTable, columns
|
||||
from tenancy.tables import ContactsColumnMixin, TenancyColumnsMixin
|
||||
from utilities.templatetags.helpers import humanize_disk_megabytes
|
||||
from utilities.templatetags.helpers import humanize_disk_capacity, humanize_ram_capacity
|
||||
from virtualization.models import VirtualDisk, VirtualMachine, VMInterface
|
||||
|
||||
from .template_code import *
|
||||
@@ -93,8 +93,11 @@ class VirtualMachineTable(TenancyColumnsMixin, ContactsColumnMixin, PrimaryModel
|
||||
'pk', 'name', 'status', 'site', 'cluster', 'role', 'tenant', 'vcpus', 'memory', 'disk', 'primary_ip',
|
||||
)
|
||||
|
||||
def render_memory(self, value):
|
||||
return humanize_ram_capacity(value)
|
||||
|
||||
def render_disk(self, value):
|
||||
return humanize_disk_megabytes(value)
|
||||
return humanize_disk_capacity(value)
|
||||
|
||||
|
||||
#
|
||||
@@ -184,7 +187,7 @@ class VirtualDiskTable(NetBoxTable):
|
||||
}
|
||||
|
||||
def render_size(self, value):
|
||||
return humanize_disk_megabytes(value)
|
||||
return humanize_disk_capacity(value)
|
||||
|
||||
|
||||
class VirtualMachineVirtualDiskTable(VirtualDiskTable):
|
||||
|
||||
26
netbox/virtualization/tests/test_tables.py
Normal file
26
netbox/virtualization/tests/test_tables.py
Normal file
@@ -0,0 +1,26 @@
|
||||
from utilities.testing import TableTestCases
|
||||
from virtualization.tables import *
|
||||
|
||||
|
||||
class ClusterTypeTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = ClusterTypeTable
|
||||
|
||||
|
||||
class ClusterGroupTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = ClusterGroupTable
|
||||
|
||||
|
||||
class ClusterTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = ClusterTable
|
||||
|
||||
|
||||
class VirtualMachineTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = VirtualMachineTable
|
||||
|
||||
|
||||
class VMInterfaceTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = VMInterfaceTable
|
||||
|
||||
|
||||
class VirtualDiskTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = VirtualDiskTable
|
||||
@@ -66,7 +66,7 @@ class TunnelTable(TenancyColumnsMixin, ContactsColumnMixin, PrimaryModelTable):
|
||||
model = Tunnel
|
||||
fields = (
|
||||
'pk', 'id', 'name', 'group', 'status', 'encapsulation', 'ipsec_profile', 'tenant', 'tenant_group',
|
||||
'tunnel_id', 'termination_count', 'description', 'contacts', 'comments', 'tags', 'created',
|
||||
'tunnel_id', 'terminations_count', 'description', 'contacts', 'comments', 'tags', 'created',
|
||||
'last_updated',
|
||||
)
|
||||
default_columns = ('pk', 'name', 'group', 'status', 'encapsulation', 'tenant', 'terminations_count')
|
||||
|
||||
@@ -1,23 +1,42 @@
|
||||
from django.test import RequestFactory, TestCase, tag
|
||||
|
||||
from vpn.models import TunnelTermination
|
||||
from vpn.tables import TunnelTerminationTable
|
||||
from utilities.testing import TableTestCases
|
||||
from vpn.tables import *
|
||||
|
||||
|
||||
@tag('regression')
|
||||
class TunnelTerminationTableTest(TestCase):
|
||||
def test_every_orderable_field_does_not_throw_exception(self):
|
||||
terminations = TunnelTermination.objects.all()
|
||||
fake_request = RequestFactory().get("/")
|
||||
disallowed = {'actions'}
|
||||
class TunnelGroupTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = TunnelGroupTable
|
||||
|
||||
orderable_columns = [
|
||||
column.name for column in TunnelTerminationTable(terminations).columns
|
||||
if column.orderable and column.name not in disallowed
|
||||
]
|
||||
|
||||
for col in orderable_columns:
|
||||
for dir in ('-', ''):
|
||||
table = TunnelTerminationTable(terminations)
|
||||
table.order_by = f'{dir}{col}'
|
||||
table.as_html(fake_request)
|
||||
class TunnelTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = TunnelTable
|
||||
|
||||
|
||||
class TunnelTerminationTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = TunnelTerminationTable
|
||||
|
||||
|
||||
class IKEProposalTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = IKEProposalTable
|
||||
|
||||
|
||||
class IKEPolicyTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = IKEPolicyTable
|
||||
|
||||
|
||||
class IPSecProposalTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = IPSecProposalTable
|
||||
|
||||
|
||||
class IPSecPolicyTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = IPSecPolicyTable
|
||||
|
||||
|
||||
class IPSecProfileTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = IPSecProfileTable
|
||||
|
||||
|
||||
class L2VPNTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = L2VPNTable
|
||||
|
||||
|
||||
class L2VPNTerminationTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = L2VPNTerminationTable
|
||||
|
||||
14
netbox/wireless/tests/test_tables.py
Normal file
14
netbox/wireless/tests/test_tables.py
Normal file
@@ -0,0 +1,14 @@
|
||||
from utilities.testing import TableTestCases
|
||||
from wireless.tables import *
|
||||
|
||||
|
||||
class WirelessLANGroupTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = WirelessLANGroupTable
|
||||
|
||||
|
||||
class WirelessLANTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = WirelessLANTable
|
||||
|
||||
|
||||
class WirelessLinkTableTest(TableTestCases.OrderableColumnsTestCase):
|
||||
table = WirelessLinkTable
|
||||
@@ -10,7 +10,7 @@ django-pglocks==1.0.4
|
||||
django-prometheus==2.4.1
|
||||
django-redis==6.0.0
|
||||
django-rich==2.2.0
|
||||
django-rq==3.2.2
|
||||
django-rq==4.0.1
|
||||
django-storages==1.14.6
|
||||
django-tables2==2.8.0
|
||||
django-taggit==6.1.0
|
||||
|
||||
Reference in New Issue
Block a user