Compare commits

..

6 Commits

Author SHA1 Message Date
Jeremy Stretch
b49d58bb1a Closes #21300: Cache model-specific custom field lookups for the duration of a request 2026-01-30 16:34:04 -05:00
Jeremy Stretch
aa4a9da955 Closes #21303: Cache serialized post-change data on object (#21325)
* Closes #21303: Cache serialized post-change data on object

* Set to_objectchange.alters_data

* Restructure logic for determining post-change snapshot
2026-01-30 14:49:12 -05:00
Jeremy Stretch
5c6fc2fb6f Closes #21110: Support for cursor-based pagination in GraphQL API (#21322) 2026-01-30 11:45:35 -08:00
Jeremy Stretch
ad29cb2d66 Closes #21263: Prefetch related objects after creating/updating objects via REST API (#21329)
* Closes #21263: Prefetch related objects after creating/updating objects via REST API

* Add comment re: ordering by PK
2026-01-30 14:13:05 -05:00
Aditya Sharma
bec5ecf6a9 Closes #21209: Accept case-insensitive model names in configuration (#21275)
NetBox now accepts case-insensitive model identifiers in configuration, allowing
both lowercase (e.g. "dcim.site") and PascalCase (e.g. "dcim.Site") for
DEFAULT_DASHBOARD, CUSTOM_VALIDATORS, and PROTECTION_RULES.
This makes model name handling consistent with FIELD_CHOICES.

- Add a shared case-insensitive config lookup helper (get_config_value_ci())
- Use the helper in extras/signals.py and core/signals.py
- Update FIELD_CHOICES ChoiceSetMeta to support case-insensitive replace/extend
  (only compute extend choices if no replacement is defined)
- Add unit tests for get_config_value_ci()
- Add integration tests for case-insensitive FIELD_CHOICES replacement/extension
- Update documentation examples to use PascalCase consistently
2026-01-30 13:48:38 +01:00
github-actions
c98f55dbd2 Update source translation strings 2026-01-30 05:18:59 +00:00
27 changed files with 508 additions and 134 deletions

View File

@@ -8,7 +8,7 @@ This is a mapping of models to [custom validators](../customization/custom-valid
```python
CUSTOM_VALIDATORS = {
"dcim.site": [
"dcim.Site": [
{
"name": {
"min_length": 5,
@@ -17,12 +17,15 @@ CUSTOM_VALIDATORS = {
},
"my_plugin.validators.Validator1"
],
"dcim.device": [
"dcim.Device": [
"my_plugin.validators.Validator1"
]
}
```
!!! info "Case-Insensitive Model Names"
Model identifiers are case-insensitive. Both `dcim.site` and `dcim.Site` are valid and equivalent.
---
## FIELD_CHOICES
@@ -53,6 +56,9 @@ FIELD_CHOICES = {
}
```
!!! info "Case-Insensitive Field Identifiers"
Field identifiers are case-insensitive. Both `dcim.Site.status` and `dcim.site.status` are valid and equivalent.
The following model fields support configurable choices:
* `circuits.Circuit.status`
@@ -98,7 +104,7 @@ This is a mapping of models to [custom validators](../customization/custom-valid
```python
PROTECTION_RULES = {
"dcim.site": [
"dcim.Site": [
{
"status": {
"eq": "decommissioning"
@@ -108,3 +114,6 @@ PROTECTION_RULES = {
]
}
```
!!! info "Case-Insensitive Model Names"
Model identifiers are case-insensitive. Both `dcim.site` and `dcim.Site` are valid and equivalent.

View File

@@ -133,23 +133,67 @@ The field "class_type" is an easy way to distinguish what type of object it is w
## Pagination
Queries can be paginated by specifying pagination in the query and supplying an offset and optionaly a limit in the query. If no limit is given, a default of 100 is used. Queries are not paginated unless requested in the query. An example paginated query is shown below:
The GraphQL API supports two types of pagination. Offset-based pagination operates using an offset relative to the first record in a set, specified by the `offset` parameter. For example, the response to a request specifying an offset of 100 will contain the 101st and later matching records. Offset-based pagination feels very natural, but its performance can suffer when dealing with large data sets due to the overhead involved in calculating the relative offset.
The alternative approach is cursor-based pagination, which operates using absolute (rather than relative) primary key values. (These are the numeric IDs assigned to each object in the database.) When using cursor-based pagination, the response will contain records with a primary key greater than or equal to the specified start value, up to the maximum number of results. This strategy requires keeping track of the last seen primary key from each response when paginating through data, but is extremely performant. The cursor is specified by passing the starting object ID via the `start` parameter.
To ensure consistent ordering, objects will always be ordered by their primary keys when cursor-based pagination is used.
!!! note "Cursor-based pagination was introduced in NetBox v4.5.2."
Both pagination strategies support passing an optional `limit` parameter. In both approaches, this specifies the maximum number of objects to include in the response. If no limit is specified, a default value of 100 is used.
### Offset Pagination
The first page will have an `offset` of zero, or the `offset` parameter will be omitted:
```
query {
device_list(pagination: { offset: 0, limit: 20 }) {
device_list(pagination: {offset: 0, limit: 20}) {
id
}
}
```
The second page will have an offset equal to the size of the first page. If the number of records is less than the specified limit, there are no more records to process. For example, if a request specifies a `limit` of 20 but returns only 13 records, we can conclude that this is the final page of records.
```
query {
device_list(pagination: {offset: 20, limit: 20}) {
id
}
}
```
### Cursor Pagination
Set the `start` value to zero to fetch the first page. Note that if the `start` parameter is omitted, offset-based pagination will be used by default.
```
query {
device_list(pagination: {start: 0, limit: 20}) {
id
}
}
```
To determine the `start` value for the next page, add 1 to the primary key (`id`) of the last record in the previous page.
For example, if the ID of the last record in the previous response was 123, we would specify a `start` value of 124:
```
query {
device_list(pagination: {start: 124, limit: 20}) {
id
}
}
```
This will return up to 20 records with an ID greater than or equal to 124.
## Authentication
NetBox's GraphQL API uses the same API authentication tokens as its REST API. Authentication tokens are included with requests by attaching an `Authorization` HTTP header in the following form:
```
Authorization: Token $TOKEN
```
NetBox's GraphQL API uses the same API authentication tokens as its REST API. See the [REST API authentication](./rest-api.md#authentication) documentation for further detail.
## Disabling the GraphQL API

View File

@@ -18,6 +18,7 @@ from extras.events import enqueue_event
from extras.models import Tag
from extras.utils import run_validators
from netbox.config import get_config
from utilities.data import get_config_value_ci
from netbox.context import current_request, events_queue
from netbox.models.features import ChangeLoggingMixin, get_model_features, model_is_public
from utilities.exceptions import AbortRequest
@@ -168,7 +169,7 @@ def handle_deleted_object(sender, instance, **kwargs):
# to queueing any events for the object being deleted, in case a validation error is
# raised, causing the deletion to fail.
model_name = f'{sender._meta.app_label}.{sender._meta.model_name}'
validators = get_config().PROTECTION_RULES.get(model_name, [])
validators = get_config_value_ci(get_config().PROTECTION_RULES, model_name, default=[])
try:
run_validators(instance, validators)
except ValidationError as e:

View File

@@ -4,7 +4,6 @@ from drf_spectacular.utils import extend_schema_field
from rest_framework.fields import Field
from rest_framework.serializers import ValidationError
from core.models import ObjectType
from extras.choices import CustomFieldTypeChoices
from extras.constants import CUSTOMFIELD_EMPTY_VALUES
from extras.models import CustomField
@@ -25,8 +24,7 @@ class CustomFieldDefaultValues:
self.model = serializer_field.parent.Meta.model
# Retrieve the CustomFields for the parent model
object_type = ObjectType.objects.get_for_model(self.model)
fields = CustomField.objects.filter(object_types=object_type)
fields = CustomField.objects.get_for_model(self.model)
# Populate the default value for each CustomField
value = {}
@@ -47,8 +45,7 @@ class CustomFieldsDataField(Field):
Cache CustomFields assigned to this model to avoid redundant database queries
"""
if not hasattr(self, '_custom_fields'):
object_type = ObjectType.objects.get_for_model(self.parent.Meta.model)
self._custom_fields = CustomField.objects.filter(object_types=object_type)
self._custom_fields = CustomField.objects.get_for_model(self.parent.Meta.model)
return self._custom_fields
def to_representation(self, obj):

View File

@@ -75,10 +75,11 @@ def get_bookmarks_object_type_choices():
def get_models_from_content_types(content_types):
"""
Return a list of models corresponding to the given content types, identified by natural key.
Accepts both lowercase (e.g. "dcim.site") and PascalCase (e.g. "dcim.Site") model names.
"""
models = []
for content_type_id in content_types:
app_label, model_name = content_type_id.split('.')
app_label, model_name = content_type_id.lower().split('.')
try:
content_type = ObjectType.objects.get_by_natural_key(app_label, model_name)
if content_type.model_class():

View File

@@ -51,18 +51,26 @@ def serialize_for_event(instance):
def get_snapshots(instance, event_type):
snapshots = {
'prechange': getattr(instance, '_prechange_snapshot', None),
'postchange': None,
}
if event_type != OBJECT_DELETED:
# Use model's serialize_object() method if defined; fall back to serialize_object() utility function
if hasattr(instance, 'serialize_object'):
snapshots['postchange'] = instance.serialize_object()
else:
snapshots['postchange'] = serialize_object(instance)
"""
Return a dictionary of pre- and post-change snapshots for the given instance.
"""
if event_type == OBJECT_DELETED:
# Post-change snapshot must be empty for deleted objects
postchange_snapshot = None
elif hasattr(instance, '_postchange_snapshot'):
# Use the cached post-change snapshot if one is available
postchange_snapshot = instance._postchange_snapshot
elif hasattr(instance, 'serialize_object'):
# Use model's serialize_object() method if defined
postchange_snapshot = instance.serialize_object()
else:
# Fall back to the serialize_object() utility function
postchange_snapshot = serialize_object(instance)
return snapshots
return {
'prechange': getattr(instance, '_prechange_snapshot', None),
'postchange': postchange_snapshot,
}
def enqueue_event(queue, instance, request, event_type):

View File

@@ -19,6 +19,7 @@ from django.utils.translation import gettext_lazy as _
from core.models import ObjectType
from extras.choices import *
from extras.data import CHOICE_SETS
from netbox.context import query_cache
from netbox.models import ChangeLoggedModel
from netbox.models.features import CloningMixin, ExportTemplatesMixin
from netbox.models.mixins import OwnerMixin
@@ -58,8 +59,20 @@ class CustomFieldManager(models.Manager.from_queryset(RestrictedQuerySet)):
"""
Return all CustomFields assigned to the given model.
"""
# Check the request cache before hitting the database
cache = query_cache.get()
if cache is not None:
if custom_fields := cache['custom_fields'].get(model._meta.model):
return custom_fields
content_type = ObjectType.objects.get_for_model(model._meta.concrete_model)
return self.get_queryset().filter(object_types=content_type)
custom_fields = self.get_queryset().filter(object_types=content_type)
# Populate the request cache to avoid redundant lookups
if cache is not None:
cache['custom_fields'][model._meta.model] = custom_fields
return custom_fields
def get_defaults_for_model(self, model):
"""

View File

@@ -9,6 +9,7 @@ from extras.models import EventRule, Notification, Subscription
from netbox.config import get_config
from netbox.models.features import has_feature
from netbox.signals import post_clean
from utilities.data import get_config_value_ci
from utilities.exceptions import AbortRequest
from .models import CustomField, TaggedItem
from .utils import run_validators
@@ -65,7 +66,7 @@ def run_save_validators(sender, instance, **kwargs):
Run any custom validation rules for the model prior to calling save().
"""
model_name = f'{sender._meta.app_label}.{sender._meta.model_name}'
validators = get_config().CUSTOM_VALIDATORS.get(model_name, [])
validators = get_config_value_ci(get_config().CUSTOM_VALIDATORS, model_name, default=[])
run_validators(instance, validators)

View File

@@ -1,4 +1,3 @@
from django.contrib.contenttypes.models import ContentType
from django.core.exceptions import ObjectDoesNotExist
from django.db.backends.postgresql.psycopg_any import NumericRange
from django.utils.translation import gettext as _
@@ -110,7 +109,7 @@ class ContentTypeField(RelatedField):
def to_internal_value(self, data):
try:
app_label, model = data.split('.')
return ContentType.objects.get_by_natural_key(app_label=app_label, model=model)
return self.queryset.get(app_label=app_label, model=model)
except ObjectDoesNotExist:
self.fail('does_not_exist', content_type=data)
except (AttributeError, TypeError, ValueError):

View File

@@ -170,6 +170,28 @@ class NetBoxModelViewSet(
# Creates
def create(self, request, *args, **kwargs):
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
bulk_create = getattr(serializer, 'many', False)
self.perform_create(serializer)
# After creating the instance(s), re-initialize the serializer with a queryset
# to ensure related objects are prefetched.
if bulk_create:
instance_pks = [obj.pk for obj in serializer.instance]
# Order by PK to ensure that the ordering of objects in the response
# matches the ordering of those in the request.
qs = self.get_queryset().filter(pk__in=instance_pks).order_by('pk')
else:
qs = self.get_queryset().get(pk=serializer.instance.pk)
# Re-serialize the instance(s) with prefetched data
serializer = self.get_serializer(qs, many=bulk_create)
headers = self.get_success_headers(serializer.data)
return Response(serializer.data, status=status.HTTP_201_CREATED, headers=headers)
def perform_create(self, serializer):
model = self.queryset.model
logger = logging.getLogger(f'netbox.api.views.{self.__class__.__name__}')
@@ -186,9 +208,20 @@ class NetBoxModelViewSet(
# Updates
def update(self, request, *args, **kwargs):
# Hotwire get_object() to ensure we save a pre-change snapshot
self.get_object = self.get_object_with_snapshot
return super().update(request, *args, **kwargs)
partial = kwargs.pop('partial', False)
instance = self.get_object_with_snapshot()
serializer = self.get_serializer(instance, data=request.data, partial=partial)
serializer.is_valid(raise_exception=True)
self.perform_update(serializer)
# After updating the instance, re-initialize the serializer with a queryset
# to ensure related objects are prefetched.
qs = self.get_queryset().get(pk=serializer.instance.pk)
# Re-serialize the instance(s) with prefetched data
serializer = self.get_serializer(qs)
return Response(serializer.data)
def perform_update(self, serializer):
model = self.queryset.model

View File

@@ -108,13 +108,17 @@ class BulkUpdateModelMixin:
obj.pop('id'): obj for obj in request.data
}
data = self.perform_bulk_update(qs, update_data, partial=partial)
object_pks = self.perform_bulk_update(qs, update_data, partial=partial)
return Response(data, status=status.HTTP_200_OK)
# Prefetch related objects for all updated instances
qs = self.get_queryset().filter(pk__in=object_pks)
serializer = self.get_serializer(qs, many=True)
return Response(serializer.data, status=status.HTTP_200_OK)
def perform_bulk_update(self, objects, update_data, partial):
updated_pks = []
with transaction.atomic(using=router.db_for_write(self.queryset.model)):
data_list = []
for obj in objects:
data = update_data.get(obj.id)
if hasattr(obj, 'snapshot'):
@@ -122,9 +126,9 @@ class BulkUpdateModelMixin:
serializer = self.get_serializer(obj, data=data, partial=partial)
serializer.is_valid(raise_exception=True)
self.perform_update(serializer)
data_list.append(serializer.data)
updated_pks.append(obj.pk)
return data_list
return updated_pks
def bulk_partial_update(self, request, *args, **kwargs):
kwargs['partial'] = True

View File

@@ -306,11 +306,10 @@ class NetBoxModelFilterSet(ChangeLoggedModelFilterSet):
super().__init__(*args, **kwargs)
# Dynamically add a Filter for each CustomField applicable to the parent model
custom_fields = CustomField.objects.filter(
object_types=ContentType.objects.get_for_model(self._meta.model)
).exclude(
filter_logic=CustomFieldFilterLogicChoices.FILTER_DISABLED
)
custom_fields = [
cf for cf in CustomField.objects.get_for_model(self._meta.model)
if cf.filter_logic != CustomFieldFilterLogicChoices.FILTER_DISABLED
]
custom_field_filters = {}
for custom_field in custom_fields:

View File

@@ -31,10 +31,10 @@ class NetBoxModelImportForm(CSVModelForm, NetBoxModelForm):
)
def _get_custom_fields(self, content_type):
return CustomField.objects.filter(
object_types=content_type,
ui_editable=CustomFieldUIEditableChoices.YES
)
return [
cf for cf in CustomField.objects.get_for_model(content_type.model_class())
if cf.ui_editable == CustomFieldUIEditableChoices.YES
]
def _get_form_field(self, customfield):
return customfield.to_form_field(for_csv_import=True)

View File

@@ -1,5 +1,4 @@
from django import forms
from django.db.models import Q
from django.utils.translation import gettext_lazy as _
from extras.choices import *
@@ -35,10 +34,12 @@ class NetBoxModelFilterSetForm(FilterModifierMixin, CustomFieldsMixin, SavedFilt
selector_fields = ('filter_id', 'q')
def _get_custom_fields(self, content_type):
return super()._get_custom_fields(content_type).exclude(
Q(filter_logic=CustomFieldFilterLogicChoices.FILTER_DISABLED) |
Q(type=CustomFieldTypeChoices.TYPE_JSON)
)
return [
cf for cf in super()._get_custom_fields(content_type) if (
cf.filter_logic != CustomFieldFilterLogicChoices.FILTER_DISABLED and
cf.type != CustomFieldTypeChoices.TYPE_JSON
)
]
def _get_form_field(self, customfield):
return customfield.to_form_field(

View File

@@ -65,9 +65,10 @@ class CustomFieldsMixin:
return ObjectType.objects.get_for_model(self.model)
def _get_custom_fields(self, content_type):
return CustomField.objects.filter(object_types=content_type).exclude(
ui_editable=CustomFieldUIEditableChoices.HIDDEN
)
return [
cf for cf in CustomField.objects.get_for_model(content_type.model_class())
if cf.ui_editable != CustomFieldUIEditableChoices.HIDDEN
]
def _get_form_field(self, customfield):
return customfield.to_form_field()

View File

@@ -0,0 +1,50 @@
import strawberry
from strawberry.types.unset import UNSET
from strawberry_django.pagination import _QS, apply
__all__ = (
'OffsetPaginationInfo',
'OffsetPaginationInput',
'apply_pagination',
)
@strawberry.type
class OffsetPaginationInfo:
offset: int = 0
limit: int | None = UNSET
start: int | None = UNSET
@strawberry.input
class OffsetPaginationInput(OffsetPaginationInfo):
"""
Customized implementation of OffsetPaginationInput to support cursor-based pagination.
"""
pass
def apply_pagination(
self,
queryset: _QS,
pagination: OffsetPaginationInput | None = None,
*,
related_field_id: str | None = None,
) -> _QS:
"""
Replacement for the `apply_pagination()` method on StrawberryDjangoField to support cursor-based pagination.
"""
if pagination is not None and pagination.start not in (None, UNSET):
if pagination.offset:
raise ValueError('Cannot specify both `start` and `offset` in pagination.')
if pagination.start < 0:
raise ValueError('`start` must be greater than or equal to zero.')
# Filter the queryset to include only records with a primary key greater than or equal to the start value,
# and force ordering by primary key to ensure consistent pagination across all records.
queryset = queryset.filter(pk__gte=pagination.start).order_by('pk')
# Ignore `offset` when `start` is set
pagination.offset = 0
return apply(pagination, queryset, related_field_id=related_field_id)

View File

@@ -121,9 +121,11 @@ class ChangeLoggingMixin(DeleteMixin, models.Model):
if hasattr(self, '_prechange_snapshot'):
objectchange.prechange_data = self._prechange_snapshot
if action in (ObjectChangeActionChoices.ACTION_CREATE, ObjectChangeActionChoices.ACTION_UPDATE):
objectchange.postchange_data = self.serialize_object(exclude=exclude)
self._postchange_snapshot = self.serialize_object(exclude=exclude)
objectchange.postchange_data = self._postchange_snapshot
return objectchange
to_objectchange.alters_data = True
class CloningMixin(models.Model):
@@ -317,9 +319,11 @@ class CustomFieldsMixin(models.Model):
raise ValidationError(_("Missing required custom field '{name}'.").format(name=cf.name))
def save(self, *args, **kwargs):
from extras.models import CustomField
# Populate default values if omitted
for cf in self.custom_fields.filter(default__isnull=False):
if cf.name not in self.custom_field_data:
for cf in CustomField.objects.get_for_model(self):
if cf.name not in self.custom_field_data and cf.default is not None:
self.custom_field_data[cf.name] = cf.default
super().save(*args, **kwargs)

View File

@@ -208,9 +208,12 @@ class CachedValueSearchBackend(SearchBackend):
except KeyError:
break
# Prefetch any associated custom fields
# Prefetch any associated custom fields (excluding those with a zero search weight)
object_type = ObjectType.objects.get_for_model(indexer.model)
custom_fields = CustomField.objects.filter(object_types=object_type).exclude(search_weight=0)
custom_fields = [
cf for cf in CustomField.objects.get_for_model(indexer.model)
if cf.search_weight > 0
]
# Wipe out any previously cached values for the object
if remove_existing:

View File

@@ -12,10 +12,13 @@ from django.core.validators import URLValidator
from django.utils.module_loading import import_string
from django.utils.translation import gettext_lazy as _
from rest_framework.utils import field_mapping
from strawberry_django import pagination
from strawberry_django.fields.field import StrawberryDjangoField
from core.exceptions import IncompatiblePluginError
from netbox.config import PARAMS as CONFIG_PARAMS
from netbox.constants import RQ_QUEUE_DEFAULT, RQ_QUEUE_HIGH, RQ_QUEUE_LOW
from netbox.graphql.pagination import OffsetPaginationInput, apply_pagination
from netbox.plugins import PluginConfig
from netbox.registry import registry
import storages.utils # type: ignore
@@ -33,6 +36,12 @@ from .monkey import get_unique_validators
# Override DRF's get_unique_validators() function with our own (see bug #19302)
field_mapping.get_unique_validators = get_unique_validators
# Override strawberry-django's OffsetPaginationInput class to add the `start` parameter
pagination.OffsetPaginationInput = OffsetPaginationInput
# Patch StrawberryDjangoField to use our custom `apply_pagination()` method with support for cursor-based pagination
StrawberryDjangoField.apply_pagination = apply_pagination
#
# Environment setup

View File

@@ -244,9 +244,10 @@ class NetBoxTable(BaseTable):
# Add custom field & custom link columns
object_type = ObjectType.objects.get_for_model(self._meta.model)
custom_fields = CustomField.objects.filter(
object_types=object_type
).exclude(ui_visible=CustomFieldUIVisibleChoices.HIDDEN)
custom_fields = [
cf for cf in CustomField.objects.get_for_model(self._meta.model)
if cf.ui_visible != CustomFieldUIVisibleChoices.HIDDEN
]
extra_columns.extend([
(f'cf_{cf.name}', columns.CustomFieldColumn(cf)) for cf in custom_fields
])

View File

@@ -4,10 +4,8 @@ from django.test import override_settings
from django.urls import reverse
from rest_framework import status
from core.models import ObjectType
from dcim.choices import LocationStatusChoices
from dcim.models import Site, Location
from users.models import ObjectPermission
from utilities.testing import disable_warnings, APITestCase, TestCase
@@ -45,17 +43,28 @@ class GraphQLTestCase(TestCase):
class GraphQLAPITestCase(APITestCase):
@classmethod
def setUpTestData(cls):
sites = (
Site(name='Site 1', slug='site-1'),
Site(name='Site 2', slug='site-2'),
Site(name='Site 3', slug='site-3'),
Site(name='Site 4', slug='site-4'),
Site(name='Site 5', slug='site-5'),
Site(name='Site 6', slug='site-6'),
Site(name='Site 7', slug='site-7'),
)
Site.objects.bulk_create(sites)
@override_settings(LOGIN_REQUIRED=True)
def test_graphql_filter_objects(self):
"""
Test the operation of filters for GraphQL API requests.
"""
sites = (
Site(name='Site 1', slug='site-1'),
Site(name='Site 2', slug='site-2'),
Site(name='Site 3', slug='site-3'),
)
Site.objects.bulk_create(sites)
self.add_permissions('dcim.view_site', 'dcim.view_location')
url = reverse('graphql')
sites = Site.objects.all()[:3]
Location.objects.create(
site=sites[0],
name='Location 1',
@@ -75,18 +84,6 @@ class GraphQLAPITestCase(APITestCase):
status=LocationStatusChoices.STATUS_ACTIVE
),
# Add object-level permission
obj_perm = ObjectPermission(
name='Test permission',
actions=['view']
)
obj_perm.save()
obj_perm.users.add(self.user)
obj_perm.object_types.add(ObjectType.objects.get_for_model(Location))
obj_perm.object_types.add(ObjectType.objects.get_for_model(Site))
url = reverse('graphql')
# A valid request should return the filtered list
query = '{location_list(filters: {site_id: "' + str(sites[0].pk) + '"}) {id site {id}}}'
response = self.client.post(url, data={'query': query}, format="json", **self.header)
@@ -133,10 +130,136 @@ class GraphQLAPITestCase(APITestCase):
self.assertEqual(len(data['data']['location_list']), 0)
# Removing the permissions from location should result in an empty locations list
obj_perm.object_types.remove(ObjectType.objects.get_for_model(Location))
self.remove_permissions('dcim.view_location')
query = '{site(id: ' + str(sites[0].pk) + ') {id locations {id}}}'
response = self.client.post(url, data={'query': query}, format="json", **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
data = json.loads(response.content)
self.assertNotIn('errors', data)
self.assertEqual(len(data['data']['site']['locations']), 0)
def test_offset_pagination(self):
self.add_permissions('dcim.view_site')
url = reverse('graphql')
# Test `limit` only
query = """
{
site_list(pagination: {limit: 3}) {
id name
}
}
"""
response = self.client.post(url, data={'query': query}, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
data = json.loads(response.content)
self.assertNotIn('errors', data)
self.assertEqual(len(data['data']['site_list']), 3)
self.assertEqual(data['data']['site_list'][0]['name'], 'Site 1')
self.assertEqual(data['data']['site_list'][1]['name'], 'Site 2')
self.assertEqual(data['data']['site_list'][2]['name'], 'Site 3')
# Test `offset` only
query = """
{
site_list(pagination: {offset: 3}) {
id name
}
}
"""
response = self.client.post(url, data={'query': query}, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
data = json.loads(response.content)
self.assertNotIn('errors', data)
self.assertEqual(len(data['data']['site_list']), 4)
self.assertEqual(data['data']['site_list'][0]['name'], 'Site 4')
self.assertEqual(data['data']['site_list'][1]['name'], 'Site 5')
self.assertEqual(data['data']['site_list'][2]['name'], 'Site 6')
self.assertEqual(data['data']['site_list'][3]['name'], 'Site 7')
# Test `offset` & `limit`
query = """
{
site_list(pagination: {offset: 3, limit: 3}) {
id name
}
}
"""
response = self.client.post(url, data={'query': query}, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
data = json.loads(response.content)
self.assertNotIn('errors', data)
self.assertEqual(len(data['data']['site_list']), 3)
self.assertEqual(data['data']['site_list'][0]['name'], 'Site 4')
self.assertEqual(data['data']['site_list'][1]['name'], 'Site 5')
self.assertEqual(data['data']['site_list'][2]['name'], 'Site 6')
def test_cursor_pagination(self):
self.add_permissions('dcim.view_site')
url = reverse('graphql')
# Page 1
query = """
{
site_list(pagination: {start: 0, limit: 3}) {
id name
}
}
"""
response = self.client.post(url, data={'query': query}, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
data = json.loads(response.content)
self.assertNotIn('errors', data)
self.assertEqual(len(data['data']['site_list']), 3)
self.assertEqual(data['data']['site_list'][0]['name'], 'Site 1')
self.assertEqual(data['data']['site_list'][1]['name'], 'Site 2')
self.assertEqual(data['data']['site_list'][2]['name'], 'Site 3')
# Page 2
start_id = int(data['data']['site_list'][-1]['id']) + 1
query = """
{
site_list(pagination: {start: """ + str(start_id) + """, limit: 3}) {
id name
}
}
"""
response = self.client.post(url, data={'query': query}, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
data = json.loads(response.content)
self.assertNotIn('errors', data)
self.assertEqual(len(data['data']['site_list']), 3)
self.assertEqual(data['data']['site_list'][0]['name'], 'Site 4')
self.assertEqual(data['data']['site_list'][1]['name'], 'Site 5')
self.assertEqual(data['data']['site_list'][2]['name'], 'Site 6')
# Page 3
start_id = int(data['data']['site_list'][-1]['id']) + 1
query = """
{
site_list(pagination: {start: """ + str(start_id) + """, limit: 3}) {
id name
}
}
"""
response = self.client.post(url, data={'query': query}, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
data = json.loads(response.content)
self.assertNotIn('errors', data)
self.assertEqual(len(data['data']['site_list']), 1)
self.assertEqual(data['data']['site_list'][0]['name'], 'Site 7')
def test_pagination_conflict(self):
url = reverse('graphql')
query = """
{
site_list(pagination: {start: 1, offset: 1}) {
id name
}
}
"""
response = self.client.post(url, data={'query': query}, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
data = json.loads(response.content)
self.assertIn('errors', data)
self.assertEqual(data['errors'][0]['message'], 'Cannot specify both `start` and `offset` in pagination.')

View File

@@ -5,7 +5,6 @@ from copy import deepcopy
from django.contrib import messages
from django.contrib.contenttypes.fields import GenericForeignKey, GenericRel
from django.contrib.contenttypes.models import ContentType
from django.core.exceptions import FieldDoesNotExist, ObjectDoesNotExist, ValidationError
from django.db import IntegrityError, router, transaction
from django.db.models import ManyToManyField, ProtectedError, RestrictedError
@@ -485,10 +484,10 @@ class BulkImportView(GetReturnURLMixin, BaseMultiObjectView):
instance = self.queryset.model()
# For newly created objects, apply any default custom field values
custom_fields = CustomField.objects.filter(
object_types=ContentType.objects.get_for_model(self.queryset.model),
ui_editable=CustomFieldUIEditableChoices.YES
)
custom_fields = [
cf for cf in CustomField.objects.get_for_model(self.queryset.model)
if cf.ui_editable == CustomFieldUIEditableChoices.YES
]
for cf in custom_fields:
field_name = f'cf_{cf.name}'
if field_name not in record:

View File

@@ -8,7 +8,7 @@ msgid ""
msgstr ""
"Project-Id-Version: PACKAGE VERSION\n"
"Report-Msgid-Bugs-To: \n"
"POT-Creation-Date: 2026-01-29 05:16+0000\n"
"POT-Creation-Date: 2026-01-30 05:18+0000\n"
"PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n"
"Last-Translator: FULL NAME <EMAIL@ADDRESS>\n"
"Language-Team: LANGUAGE <LL@li.org>\n"
@@ -293,8 +293,8 @@ msgstr ""
#: netbox/circuits/filtersets.py:278 netbox/circuits/filtersets.py:382
#: netbox/circuits/filtersets.py:547 netbox/core/filtersets.py:85
#: netbox/core/filtersets.py:150 netbox/core/filtersets.py:176
#: netbox/core/filtersets.py:216 netbox/dcim/filtersets.py:810
#: netbox/core/filtersets.py:154 netbox/core/filtersets.py:180
#: netbox/core/filtersets.py:220 netbox/dcim/filtersets.py:810
#: netbox/dcim/filtersets.py:1568 netbox/dcim/filtersets.py:2692
#: netbox/extras/filtersets.py:48 netbox/extras/filtersets.py:71
#: netbox/extras/filtersets.py:101 netbox/extras/filtersets.py:142
@@ -763,7 +763,7 @@ msgstr ""
#: netbox/circuits/forms/filtersets.py:132
#: netbox/circuits/forms/filtersets.py:322
#: netbox/circuits/forms/filtersets.py:338 netbox/core/forms/filtersets.py:75
#: netbox/core/forms/filtersets.py:143 netbox/dcim/forms/bulk_edit.py:818
#: netbox/core/forms/filtersets.py:147 netbox/dcim/forms/bulk_edit.py:818
#: netbox/dcim/forms/bulk_import.py:480 netbox/dcim/forms/filtersets.py:199
#: netbox/dcim/forms/filtersets.py:232 netbox/dcim/forms/filtersets.py:1012
#: netbox/dcim/forms/filtersets.py:1155 netbox/dcim/forms/filtersets.py:1285
@@ -2189,7 +2189,7 @@ msgstr ""
msgid "Data source (name)"
msgstr ""
#: netbox/core/filtersets.py:186 netbox/dcim/filtersets.py:521
#: netbox/core/filtersets.py:190 netbox/dcim/filtersets.py:521
#: netbox/extras/filtersets.py:302 netbox/extras/filtersets.py:355
#: netbox/extras/filtersets.py:401 netbox/extras/filtersets.py:424
#: netbox/extras/filtersets.py:490 netbox/users/filtersets.py:31
@@ -2197,7 +2197,7 @@ msgstr ""
msgid "User (ID)"
msgstr ""
#: netbox/core/filtersets.py:192
#: netbox/core/filtersets.py:196
msgid "User name"
msgstr ""
@@ -2277,7 +2277,7 @@ msgstr ""
msgid "Creation"
msgstr ""
#: netbox/core/forms/filtersets.py:82 netbox/core/forms/filtersets.py:168
#: netbox/core/forms/filtersets.py:82 netbox/core/forms/filtersets.py:172
#: netbox/extras/forms/filtersets.py:577 netbox/extras/tables/tables.py:271
#: netbox/extras/tables/tables.py:338 netbox/extras/tables/tables.py:364
#: netbox/extras/tables/tables.py:383 netbox/extras/tables/tables.py:415
@@ -2288,39 +2288,44 @@ msgstr ""
msgid "Object Type"
msgstr ""
#: netbox/core/forms/filtersets.py:92
#: netbox/core/forms/filtersets.py:92 netbox/core/tables/jobs.py:46
#: netbox/templates/core/job.html:63 netbox/templates/core/rq_task.html:61
msgid "Queue"
msgstr ""
#: netbox/core/forms/filtersets.py:96
msgid "Created after"
msgstr ""
#: netbox/core/forms/filtersets.py:97
#: netbox/core/forms/filtersets.py:101
msgid "Created before"
msgstr ""
#: netbox/core/forms/filtersets.py:102
#: netbox/core/forms/filtersets.py:106
msgid "Scheduled after"
msgstr ""
#: netbox/core/forms/filtersets.py:107
#: netbox/core/forms/filtersets.py:111
msgid "Scheduled before"
msgstr ""
#: netbox/core/forms/filtersets.py:112
#: netbox/core/forms/filtersets.py:116
msgid "Started after"
msgstr ""
#: netbox/core/forms/filtersets.py:117
#: netbox/core/forms/filtersets.py:121
msgid "Started before"
msgstr ""
#: netbox/core/forms/filtersets.py:122
#: netbox/core/forms/filtersets.py:126
msgid "Completed after"
msgstr ""
#: netbox/core/forms/filtersets.py:127
#: netbox/core/forms/filtersets.py:131
msgid "Completed before"
msgstr ""
#: netbox/core/forms/filtersets.py:134 netbox/core/forms/filtersets.py:163
#: netbox/core/forms/filtersets.py:138 netbox/core/forms/filtersets.py:167
#: netbox/dcim/forms/bulk_edit.py:455 netbox/dcim/forms/filtersets.py:509
#: netbox/dcim/forms/model_forms.py:326 netbox/extras/forms/filtersets.py:572
#: netbox/extras/forms/filtersets.py:592 netbox/extras/tables/tables.py:391
@@ -2336,22 +2341,22 @@ msgstr ""
msgid "User"
msgstr ""
#: netbox/core/forms/filtersets.py:142 netbox/core/tables/change_logging.py:15
#: netbox/core/tables/jobs.py:69 netbox/extras/tables/tables.py:773
#: netbox/core/forms/filtersets.py:146 netbox/core/tables/change_logging.py:15
#: netbox/core/tables/jobs.py:72 netbox/extras/tables/tables.py:773
#: netbox/extras/tables/tables.py:828
#: netbox/templates/core/objectchange.html:32
msgid "Time"
msgstr ""
#: netbox/core/forms/filtersets.py:147 netbox/extras/forms/filtersets.py:561
#: netbox/core/forms/filtersets.py:151 netbox/extras/forms/filtersets.py:561
msgid "After"
msgstr ""
#: netbox/core/forms/filtersets.py:152 netbox/extras/forms/filtersets.py:566
#: netbox/core/forms/filtersets.py:156 netbox/extras/forms/filtersets.py:566
msgid "Before"
msgstr ""
#: netbox/core/forms/filtersets.py:156 netbox/core/tables/change_logging.py:29
#: netbox/core/forms/filtersets.py:160 netbox/core/tables/change_logging.py:29
#: netbox/extras/forms/model_forms.py:484
#: netbox/templates/core/objectchange.html:46
#: netbox/templates/extras/eventrule.html:71
@@ -2721,28 +2726,36 @@ msgid "job ID"
msgstr ""
#: netbox/core/models/jobs.py:116
msgid "queue name"
msgstr ""
#: netbox/core/models/jobs.py:119
msgid "Name of the queue in which this job was enqueued"
msgstr ""
#: netbox/core/models/jobs.py:122
msgid "log entries"
msgstr ""
#: netbox/core/models/jobs.py:132
#: netbox/core/models/jobs.py:138
msgid "job"
msgstr ""
#: netbox/core/models/jobs.py:133
#: netbox/core/models/jobs.py:139
msgid "jobs"
msgstr ""
#: netbox/core/models/jobs.py:163
#: netbox/core/models/jobs.py:169
#, python-brace-format
msgid "Jobs cannot be assigned to this object type ({type})."
msgstr ""
#: netbox/core/models/jobs.py:216
#: netbox/core/models/jobs.py:226
#, python-brace-format
msgid "Invalid status for job termination. Choices are: {choices}"
msgstr ""
#: netbox/core/models/jobs.py:273
#: netbox/core/models/jobs.py:283
msgid ""
"enqueue() cannot be called with values for both schedule_at and immediate."
msgstr ""
@@ -2788,7 +2801,7 @@ msgstr ""
msgid "Request ID"
msgstr ""
#: netbox/core/tables/change_logging.py:45 netbox/core/tables/jobs.py:76
#: netbox/core/tables/change_logging.py:45 netbox/core/tables/jobs.py:79
#: netbox/extras/tables/tables.py:784 netbox/extras/tables/tables.py:841
#: netbox/templates/core/objectchange.html:68
msgid "Message"
@@ -2831,16 +2844,16 @@ msgstr ""
msgid "Interval"
msgstr ""
#: netbox/core/tables/jobs.py:46
#: netbox/core/tables/jobs.py:49
msgid "Log Entries"
msgstr ""
#: netbox/core/tables/jobs.py:73 netbox/extras/tables/tables.py:778
#: netbox/core/tables/jobs.py:76 netbox/extras/tables/tables.py:778
#: netbox/extras/tables/tables.py:832
msgid "Level"
msgstr ""
#: netbox/core/tables/jobs.py:80
#: netbox/core/tables/jobs.py:83
msgid "No log entries"
msgstr ""
@@ -8873,7 +8886,7 @@ msgstr ""
#: netbox/extras/forms/filtersets.py:176 netbox/extras/forms/filtersets.py:377
#: netbox/extras/forms/filtersets.py:400 netbox/extras/forms/filtersets.py:496
#: netbox/extras/forms/model_forms.py:690 netbox/templates/core/job.html:69
#: netbox/extras/forms/model_forms.py:690 netbox/templates/core/job.html:73
#: netbox/templates/extras/eventrule.html:84
msgid "Data"
msgstr ""
@@ -13574,10 +13587,6 @@ msgstr ""
msgid "Enqueue"
msgstr ""
#: netbox/templates/core/rq_task.html:61
msgid "Queue"
msgstr ""
#: netbox/templates/core/rq_task.html:65
msgid "Timeout"
msgstr ""

View File

@@ -3,6 +3,7 @@ import enum
from django.conf import settings
from django.utils.translation import gettext_lazy as _
from utilities.data import get_config_value_ci
from utilities.string import enum_key
__all__ = (
@@ -24,13 +25,14 @@ class ChoiceSetMeta(type):
).format(name=name)
app = attrs['__module__'].split('.', 1)[0]
replace_key = f'{app}.{key}'
extend_key = f'{replace_key}+' if replace_key else None
if replace_key and replace_key in settings.FIELD_CHOICES:
# Replace the stock choices
attrs['CHOICES'] = settings.FIELD_CHOICES[replace_key]
elif extend_key and extend_key in settings.FIELD_CHOICES:
# Extend the stock choices
attrs['CHOICES'].extend(settings.FIELD_CHOICES[extend_key])
replace_choices = get_config_value_ci(settings.FIELD_CHOICES, replace_key)
if replace_choices is not None:
attrs['CHOICES'] = replace_choices
else:
extend_key = f'{replace_key}+'
extend_choices = get_config_value_ci(settings.FIELD_CHOICES, extend_key)
if extend_choices is not None:
attrs['CHOICES'].extend(extend_choices)
# Define choice tuples and color maps
attrs['_choices'] = []

View File

@@ -10,6 +10,7 @@ __all__ = (
'deepmerge',
'drange',
'flatten_dict',
'get_config_value_ci',
'ranges_to_string',
'ranges_to_string_list',
'resolve_attr_path',
@@ -22,6 +23,19 @@ __all__ = (
# Dictionary utilities
#
def get_config_value_ci(config_dict, key, default=None):
"""
Retrieve a value from a dictionary using case-insensitive key matching.
"""
if key in config_dict:
return config_dict[key]
key_lower = key.lower()
for config_key, value in config_dict.items():
if config_key.lower() == key_lower:
return value
return default
def deepmerge(original, new):
"""
Deep merge two dictionaries (new into original) and return a new dict

View File

@@ -1,4 +1,4 @@
from django.test import TestCase
from django.test import TestCase, override_settings
from utilities.choices import ChoiceSet
@@ -30,3 +30,29 @@ class ChoiceSetTestCase(TestCase):
def test_values(self):
self.assertListEqual(ExampleChoices.values(), ['a', 'b', 'c', 1, 2, 3])
class FieldChoicesCaseInsensitiveTestCase(TestCase):
"""
Integration tests for FIELD_CHOICES case-insensitive key lookup.
"""
def test_replace_choices_with_different_casing(self):
"""Test that replacement works when config key casing differs."""
# Config uses lowercase, but code constructs PascalCase key
with override_settings(FIELD_CHOICES={'utilities.teststatus': [('new', 'New')]}):
class TestStatusChoices(ChoiceSet):
key = 'TestStatus' # Code will look up 'utilities.TestStatus'
CHOICES = [('old', 'Old')]
self.assertEqual(TestStatusChoices.CHOICES, [('new', 'New')])
def test_extend_choices_with_different_casing(self):
"""Test that extension works with the + suffix under casing differences."""
# Config uses lowercase with + suffix
with override_settings(FIELD_CHOICES={'utilities.teststatus+': [('extra', 'Extra')]}):
class TestStatusChoices(ChoiceSet):
key = 'TestStatus' # Code will look up 'utilities.TestStatus+'
CHOICES = [('base', 'Base')]
self.assertEqual(TestStatusChoices.CHOICES, [('base', 'Base'), ('extra', 'Extra')])

View File

@@ -2,6 +2,7 @@ from django.db.backends.postgresql.psycopg_any import NumericRange
from django.test import TestCase
from utilities.data import (
check_ranges_overlap,
get_config_value_ci,
ranges_to_string,
ranges_to_string_list,
string_to_ranges,
@@ -96,3 +97,25 @@ class RangeFunctionsTestCase(TestCase):
string_to_ranges('2-10, a-b'),
None # Fails to convert
)
class GetConfigValueCITestCase(TestCase):
def test_exact_match(self):
config = {'dcim.site': 'value1', 'dcim.Device': 'value2'}
self.assertEqual(get_config_value_ci(config, 'dcim.site'), 'value1')
self.assertEqual(get_config_value_ci(config, 'dcim.Device'), 'value2')
def test_case_insensitive_match(self):
config = {'dcim.Site': 'value1', 'ipam.IPAddress': 'value2'}
self.assertEqual(get_config_value_ci(config, 'dcim.site'), 'value1')
self.assertEqual(get_config_value_ci(config, 'ipam.ipaddress'), 'value2')
def test_default_value(self):
config = {'dcim.site': 'value1'}
self.assertIsNone(get_config_value_ci(config, 'nonexistent'))
self.assertEqual(get_config_value_ci(config, 'nonexistent', default=[]), [])
def test_empty_dict(self):
self.assertIsNone(get_config_value_ci({}, 'any.key'))
self.assertEqual(get_config_value_ci({}, 'any.key', default=[]), [])