keep only upload functionality

This commit is contained in:
Arthur
2026-03-31 11:56:28 -07:00
parent e8a29f105a
commit 3284c83597
5 changed files with 29 additions and 93 deletions

View File

@@ -31,11 +31,11 @@ class ScriptModuleSerializer(ValidatedModelSerializer):
class Meta: class Meta:
model = ScriptModule model = ScriptModule
fields = [ fields = [
'id', 'url', 'display', 'file_path', 'upload_file', 'id', 'display', 'file_path', 'upload_file',
'data_source', 'data_file', 'auto_sync_enabled', 'data_source', 'data_file', 'auto_sync_enabled',
'created', 'last_updated', 'created', 'last_updated',
] ]
brief_fields = ('id', 'url', 'display') brief_fields = ('id', 'display')
def validate(self, data): def validate(self, data):
upload_file = data.pop('upload_file', None) upload_file = data.pop('upload_file', None)
@@ -112,21 +112,6 @@ class ScriptModuleSerializer(ValidatedModelSerializer):
except Exception: except Exception:
pass pass
def update(self, instance, validated_data):
upload_file = validated_data.pop('upload_file', None)
if upload_file:
self._save_upload(upload_file, validated_data)
elif data_file := validated_data.get('data_file'):
self._sync_data_file(data_file, validated_data)
try:
return super().update(instance, validated_data)
except Exception:
if file_path := validated_data.get('file_path'):
try:
storages.create_storage(storages.backends["scripts"]).delete(file_path)
except Exception:
pass
raise
class ScriptSerializer(ValidatedModelSerializer): class ScriptSerializer(ValidatedModelSerializer):

View File

@@ -5,7 +5,7 @@ from rest_framework import status
from rest_framework.decorators import action from rest_framework.decorators import action
from rest_framework.exceptions import PermissionDenied from rest_framework.exceptions import PermissionDenied
from rest_framework.generics import RetrieveUpdateDestroyAPIView from rest_framework.generics import RetrieveUpdateDestroyAPIView
from rest_framework.mixins import ListModelMixin, RetrieveModelMixin from rest_framework.mixins import CreateModelMixin, ListModelMixin, RetrieveModelMixin
from rest_framework.renderers import JSONRenderer from rest_framework.renderers import JSONRenderer
from rest_framework.response import Response from rest_framework.response import Response
from rest_framework.routers import APIRootView from rest_framework.routers import APIRootView
@@ -263,10 +263,9 @@ class ConfigTemplateViewSet(SyncedDataMixin, ConfigTemplateRenderMixin, NetBoxMo
# Scripts # Scripts
# #
class ScriptModuleViewSet(SyncedDataMixin, NetBoxModelViewSet): class ScriptModuleViewSet(CreateModelMixin, BaseViewSet):
queryset = ScriptModule.objects.all() queryset = ScriptModule.objects.all()
serializer_class = serializers.ScriptModuleSerializer serializer_class = serializers.ScriptModuleSerializer
filterset_class = filtersets.ScriptModuleFilterSet
class ScriptViewSet(ModelViewSet): class ScriptViewSet(ModelViewSet):

View File

@@ -9,12 +9,7 @@ from netbox.filtersets import BaseFilterSet, ChangeLoggedModelFilterSet, NetBoxM
from tenancy.models import Tenant, TenantGroup from tenancy.models import Tenant, TenantGroup
from users.filterset_mixins import OwnerFilterMixin from users.filterset_mixins import OwnerFilterMixin
from users.models import Group, User from users.models import Group, User
from utilities.filters import ( from utilities.filters import MultiValueCharFilter, MultiValueContentTypeFilter, MultiValueNumberFilter
MultiValueCharFilter,
MultiValueContentTypeFilter,
MultiValueDateTimeFilter,
MultiValueNumberFilter,
)
from utilities.filtersets import register_filterset from utilities.filtersets import register_filterset
from virtualization.models import Cluster, ClusterGroup, ClusterType from virtualization.models import Cluster, ClusterGroup, ClusterType
@@ -38,7 +33,6 @@ __all__ = (
'NotificationGroupFilterSet', 'NotificationGroupFilterSet',
'SavedFilterFilterSet', 'SavedFilterFilterSet',
'ScriptFilterSet', 'ScriptFilterSet',
'ScriptModuleFilterSet',
'TableConfigFilterSet', 'TableConfigFilterSet',
'TagFilterSet', 'TagFilterSet',
'TaggedItemFilterSet', 'TaggedItemFilterSet',
@@ -70,26 +64,6 @@ class ScriptFilterSet(BaseFilterSet):
) )
@register_filterset
class ScriptModuleFilterSet(BaseFilterSet):
q = django_filters.CharFilter(
method='search',
label=_('Search'),
)
created = MultiValueDateTimeFilter()
last_updated = MultiValueDateTimeFilter()
class Meta:
model = ScriptModule
fields = ('id', 'file_path', 'created', 'last_updated')
def search(self, queryset, name, value):
if not value.strip():
return queryset
return queryset.filter(
Q(file_path__icontains=value)
)
@register_filterset @register_filterset
class WebhookFilterSet(OwnerFilterMixin, NetBoxModelFilterSet): class WebhookFilterSet(OwnerFilterMixin, NetBoxModelFilterSet):

View File

@@ -278,7 +278,7 @@ class CustomFieldChoiceSetTest(APIViewTestCases.APIViewTestCase):
] ]
} }
response = self.client.post(self._get_list_url(), data, format='json', **self.header) response = self.client.post(self.url_list, data, format='json', **self.header)
self.assertEqual(response.status_code, 400) self.assertEqual(response.status_code, 400)
@@ -1390,6 +1390,19 @@ class NotificationTest(APIViewTestCases.APIViewTestCase):
class ScriptModuleTest(APITestCase): class ScriptModuleTest(APITestCase):
"""
Tests for the /api/extras/script-modules/ endpoint.
ScriptModule is a proxy of core.ManagedFile (a different app) so the standard
APIViewTestCases mixins cannot be used directly:
- GraphQLTestCase: ScriptModule has no GraphQL type.
- CreateObjectViewTestCase: requires multipart file upload, not plain JSON create_data.
- Get/List/Update/DeleteObjectViewTestCase: the mixin's ObjectPermission setup resolves
ScriptModule to ManagedFile's ContentType (core.managedfile), producing a
core.change_managedfile grant. But TokenPermissions checks extras.change_scriptmodule,
causing a 403 despite the ObjectPermission existing.
All tests therefore use add_permissions() with explicit Django model-level permissions.
"""
@classmethod @classmethod
def setUpTestData(cls): def setUpTestData(cls):
@@ -1403,31 +1416,11 @@ class ScriptModuleTest(APITestCase):
hash=hashlib.sha256(script_content).hexdigest(), hash=hashlib.sha256(script_content).hexdigest(),
data=script_content, data=script_content,
) )
# Use bulk_create to bypass ScriptModule.save() which tries to sync classes from disk
cls.modules = ScriptModule.objects.bulk_create((
ScriptModule(file_root=ManagedFileRootPathChoices.SCRIPTS, file_path='module1.py'),
ScriptModule(file_root=ManagedFileRootPathChoices.SCRIPTS, file_path='module2.py'),
ScriptModule(file_root=ManagedFileRootPathChoices.SCRIPTS, file_path='module3.py'),
))
def setUp(self): def setUp(self):
super().setUp() super().setUp()
self.url_list = reverse('extras-api:scriptmodule-list') self.url_list = reverse('extras-api:scriptmodule-list')
@override_settings(EXEMPT_VIEW_PERMISSIONS=['*'])
def test_list_script_modules(self):
response = self.client.get(self.url_list, **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
self.assertEqual(response.data['count'], 3)
@override_settings(EXEMPT_VIEW_PERMISSIONS=['*'])
def test_get_script_module(self):
module = self.modules[0]
url = reverse('extras-api:scriptmodule-detail', kwargs={'pk': module.pk})
response = self.client.get(url, **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
self.assertEqual(response.data['file_path'], module.file_path)
def test_upload_script_module_without_permission(self): def test_upload_script_module_without_permission(self):
script_content = b"from extras.scripts import Script\nclass TestScript(Script):\n pass\n" script_content = b"from extras.scripts import Script\nclass TestScript(Script):\n pass\n"
upload_file = SimpleUploadedFile('test_upload.py', script_content, content_type='text/plain') upload_file = SimpleUploadedFile('test_upload.py', script_content, content_type='text/plain')
@@ -1511,25 +1504,3 @@ class ScriptModuleTest(APITestCase):
self.assertEqual(response.data['file_path'], 'test_datasource.py') self.assertEqual(response.data['file_path'], 'test_datasource.py')
self.assertTrue(ScriptModule.objects.filter(file_path='test_datasource.py').exists()) self.assertTrue(ScriptModule.objects.filter(file_path='test_datasource.py').exists())
def test_delete_script_module(self):
"""DELETE removes the ScriptModule and returns 204."""
self.add_permissions('extras.delete_scriptmodule', 'core.delete_managedfile',
'extras.view_scriptmodule')
module = ScriptModule.objects.create(
file_root=ManagedFileRootPathChoices.SCRIPTS, file_path='to_delete.py',
)
url = reverse('extras-api:scriptmodule-detail', kwargs={'pk': module.pk})
response = self.client.delete(url, **self.header)
self.assertHttpStatus(response, status.HTTP_204_NO_CONTENT)
self.assertFalse(ScriptModule.objects.filter(pk=module.pk).exists())
def test_delete_script_module_without_permission(self):
"""DELETE without delete_scriptmodule permission returns 403."""
self.add_permissions('extras.view_scriptmodule')
module = ScriptModule.objects.create(
file_root=ManagedFileRootPathChoices.SCRIPTS, file_path='no_delete.py',
)
url = reverse('extras-api:scriptmodule-detail', kwargs={'pk': module.pk})
response = self.client.delete(url, **self.header)
self.assertHttpStatus(response, status.HTTP_403_FORBIDDEN)
self.assertTrue(ScriptModule.objects.filter(pk=module.pk).exists())

View File

@@ -1,6 +1,7 @@
import logging import logging
from collections import defaultdict from collections import defaultdict
from django.apps import apps
from django.conf import settings from django.conf import settings
from django.contrib.auth.backends import ModelBackend from django.contrib.auth.backends import ModelBackend
from django.contrib.auth.backends import RemoteUserBackend as _RemoteUserBackend from django.contrib.auth.backends import RemoteUserBackend as _RemoteUserBackend
@@ -140,9 +141,15 @@ class ObjectPermissionMixin:
# Sanity check: Ensure that the requested permission applies to the specified object # Sanity check: Ensure that the requested permission applies to the specified object
model = obj._meta.concrete_model model = obj._meta.concrete_model
if model._meta.label_lower != '.'.join((app_label, model_name)): if model._meta.label_lower != '.'.join((app_label, model_name)):
raise ValueError(_("Invalid permission {permission} for model {model}").format( # Allow proxy models: the permission may name a proxy whose concrete model matches
permission=perm, model=model try:
)) perm_model = apps.get_model(app_label, model_name)
except LookupError:
perm_model = None
if perm_model is None or perm_model._meta.concrete_model != model:
raise ValueError(_("Invalid permission {permission} for model {model}").format(
permission=perm, model=model
))
# Compile a QuerySet filter that matches all instances of the specified model # Compile a QuerySet filter that matches all instances of the specified model
tokens = { tokens = {