mirror of
https://github.com/netbox-community/netbox.git
synced 2026-04-11 11:47:08 +02:00
Add nat_inside and nat_outside fields to primary_ip, primary_ip4, primary_ip6, and oob_ip on Device and VirtualMachine serializers. Update prefetch logic to honor field-level constraints on nested serializers and add test coverage for NAT field inclusion. Fixes #19138
232 lines
8.2 KiB
Python
232 lines
8.2 KiB
Python
from django.contrib.contenttypes.fields import GenericForeignKey
|
|
from django.core.exceptions import (
|
|
FieldDoesNotExist,
|
|
FieldError,
|
|
MultipleObjectsReturned,
|
|
ObjectDoesNotExist,
|
|
ValidationError,
|
|
)
|
|
from django.db.models.fields.related import ManyToOneRel, RelatedField
|
|
from django.urls import reverse
|
|
from django.utils.module_loading import import_string
|
|
from django.utils.translation import gettext_lazy as _
|
|
from rest_framework.permissions import BasePermission
|
|
from rest_framework.serializers import ListSerializer, Serializer
|
|
from rest_framework.views import get_view_name as drf_get_view_name
|
|
|
|
from extras.constants import HTTP_CONTENT_TYPE_JSON
|
|
from netbox.api.exceptions import GraphQLTypeNotFound, SerializerNotFound
|
|
from netbox.api.fields import RelatedObjectCountField
|
|
|
|
from .query import count_related, dict_to_filter_params
|
|
from .string import title
|
|
|
|
__all__ = (
|
|
'IsSuperuser',
|
|
'get_annotations_for_serializer',
|
|
'get_graphql_type_for_model',
|
|
'get_prefetches_for_serializer',
|
|
'get_related_object_by_attrs',
|
|
'get_serializer_for_model',
|
|
'get_view_name',
|
|
'is_api_request',
|
|
'is_graphql_request',
|
|
)
|
|
|
|
|
|
class IsSuperuser(BasePermission):
|
|
"""
|
|
Allows access only to superusers.
|
|
"""
|
|
def has_permission(self, request, view):
|
|
return bool(request.user and request.user.is_superuser)
|
|
|
|
|
|
def get_serializer_for_model(model, prefix=''):
|
|
"""
|
|
Return the appropriate REST API serializer for the given model.
|
|
"""
|
|
app_label, model_name = model._meta.label.split('.')
|
|
serializer_name = f'{app_label}.api.serializers.{prefix}{model_name}Serializer'
|
|
try:
|
|
return import_string(serializer_name)
|
|
except ImportError:
|
|
raise SerializerNotFound(
|
|
f"Could not determine serializer for {app_label}.{model_name} with prefix '{prefix}'"
|
|
)
|
|
|
|
|
|
def get_graphql_type_for_model(model):
|
|
"""
|
|
Return the GraphQL type class for the given model.
|
|
"""
|
|
app_label, model_name = model._meta.label.split('.')
|
|
class_name = f'{app_label}.graphql.types.{model_name}Type'
|
|
try:
|
|
return import_string(class_name)
|
|
except ImportError:
|
|
raise GraphQLTypeNotFound(f"Could not find GraphQL type for {app_label}.{model_name}")
|
|
|
|
|
|
def is_api_request(request):
|
|
"""
|
|
Return True of the request is being made via the REST API.
|
|
"""
|
|
return request.path_info.startswith(reverse('api-root'))
|
|
|
|
|
|
def is_graphql_request(request):
|
|
"""
|
|
Return True of the request is being made via the GraphQL API.
|
|
"""
|
|
return request.path_info == reverse('graphql') and request.content_type == HTTP_CONTENT_TYPE_JSON
|
|
|
|
|
|
def get_view_name(view):
|
|
"""
|
|
Derive the view name from its associated model, if it has one. Fall back to DRF's built-in `get_view_name()`.
|
|
This function is provided to DRF as its VIEW_NAME_FUNCTION.
|
|
"""
|
|
if hasattr(view, 'queryset') and view.queryset is not None:
|
|
# Derive the model name from the queryset.
|
|
name = title(view.queryset.model._meta.verbose_name)
|
|
if suffix := getattr(view, 'suffix', None):
|
|
name = f'{name} {suffix}'
|
|
return name
|
|
|
|
# Fall back to DRF's default behavior
|
|
return drf_get_view_name(view)
|
|
|
|
|
|
def _get_nested_serializer(serializer_field):
|
|
"""
|
|
Return the nested serializer instance for a declared serializer field.
|
|
"""
|
|
if isinstance(serializer_field, ListSerializer):
|
|
serializer_field = serializer_field.child
|
|
|
|
if isinstance(serializer_field, Serializer) and hasattr(serializer_field, 'nested'):
|
|
return serializer_field
|
|
|
|
return None
|
|
|
|
|
|
def _get_serializer_fields(serializer: Serializer):
|
|
"""
|
|
Return the effective field names for a serializer instance, honoring any
|
|
field-level fields=/omit= overrides.
|
|
"""
|
|
fields = getattr(serializer, '_include_fields', None) or serializer.Meta.fields
|
|
omit = getattr(serializer, '_omit_fields', []) or []
|
|
|
|
return [field_name for field_name in fields if field_name not in omit]
|
|
|
|
|
|
def get_prefetches_for_serializer(serializer_class, fields=None, omit=None):
|
|
"""
|
|
Compile and return a list of fields which should be prefetched on the queryset for a serializer.
|
|
"""
|
|
if fields is not None and omit is not None:
|
|
raise TypeError("Cannot specify both 'fields' and 'omit' parameters.")
|
|
|
|
model = serializer_class.Meta.model
|
|
|
|
# If fields are not specified, default to all
|
|
fields_to_include = fields or serializer_class.Meta.fields
|
|
fields_to_omit = omit or []
|
|
|
|
prefetch_fields = []
|
|
for field_name in fields_to_include:
|
|
if field_name in fields_to_omit:
|
|
continue
|
|
serializer_field = serializer_class._declared_fields.get(field_name)
|
|
|
|
# Determine the name of the model field referenced by the serializer field
|
|
model_field_name = field_name
|
|
if serializer_field and getattr(serializer_field, 'source', None):
|
|
model_field_name = serializer_field.source
|
|
|
|
# If the serializer field does not map to a discrete model field, skip it.
|
|
try:
|
|
field = model._meta.get_field(model_field_name)
|
|
if isinstance(field, (RelatedField, ManyToOneRel, GenericForeignKey)):
|
|
prefetch_fields.append(field.name)
|
|
except FieldDoesNotExist:
|
|
continue
|
|
|
|
# If this field is represented by a nested serializer, recurse to resolve
|
|
# prefetches for the related object, honoring any field-level fields=/omit=
|
|
# constraints set on that serializer field instance.
|
|
if nested_serializer := _get_nested_serializer(serializer_field):
|
|
subfields = _get_serializer_fields(nested_serializer)
|
|
for subfield in get_prefetches_for_serializer(type(nested_serializer), fields=subfields):
|
|
prefetch_fields.append(f'{field.name}__{subfield}')
|
|
|
|
return prefetch_fields
|
|
|
|
|
|
def get_annotations_for_serializer(serializer_class, fields=None, omit=None):
|
|
"""
|
|
Return a mapping of field names to annotations to be applied to the queryset for a serializer.
|
|
"""
|
|
if fields is not None and omit is not None:
|
|
raise TypeError("Cannot specify both 'fields' and 'omit' parameters.")
|
|
|
|
model = serializer_class.Meta.model
|
|
|
|
# If fields are not specified, default to all
|
|
fields_to_include = fields or serializer_class.Meta.fields
|
|
fields_to_omit = omit or []
|
|
|
|
annotations = {}
|
|
for field_name, field in serializer_class._declared_fields.items():
|
|
if field_name in fields_to_omit:
|
|
continue
|
|
if field_name in fields_to_include and type(field) is RelatedObjectCountField:
|
|
related_field = getattr(model, field.relation).field
|
|
annotations[field_name] = count_related(related_field.model, related_field.name)
|
|
|
|
return annotations
|
|
|
|
|
|
def get_related_object_by_attrs(queryset, attrs):
|
|
"""
|
|
Return an object identified by either a dictionary of attributes or its numeric primary key (ID). This is used
|
|
for referencing related objects when creating/updating objects via the REST API.
|
|
"""
|
|
if attrs is None:
|
|
return None
|
|
|
|
# Dictionary of related object attributes
|
|
if isinstance(attrs, dict):
|
|
params = dict_to_filter_params(attrs)
|
|
try:
|
|
return queryset.get(**params)
|
|
except ObjectDoesNotExist:
|
|
raise ValidationError(
|
|
_("Related object not found using the provided attributes: {params}").format(params=params))
|
|
except MultipleObjectsReturned:
|
|
raise ValidationError(
|
|
_("Multiple objects match the provided attributes: {params}").format(params=params)
|
|
)
|
|
except FieldError as e:
|
|
raise ValidationError(e)
|
|
|
|
# Integer PK of related object
|
|
try:
|
|
# Cast as integer in case a PK was mistakenly sent as a string
|
|
pk = int(attrs)
|
|
except (TypeError, ValueError):
|
|
raise ValidationError(
|
|
_(
|
|
"Related objects must be referenced by numeric ID or by dictionary of attributes. Received an "
|
|
"unrecognized value: {value}"
|
|
).format(value=attrs)
|
|
)
|
|
|
|
# Look up object by PK
|
|
try:
|
|
return queryset.get(pk=pk)
|
|
except ObjectDoesNotExist:
|
|
raise ValidationError(_("Related object not found using the provided numeric ID: {id}").format(id=pk))
|