diff --git a/netbox/extras/api/serializers_/scripts.py b/netbox/extras/api/serializers_/scripts.py index a7d5b9c2a..6995eb480 100644 --- a/netbox/extras/api/serializers_/scripts.py +++ b/netbox/extras/api/serializers_/scripts.py @@ -1,19 +1,75 @@ +from django.core.files.storage import storages from django.utils.translation import gettext as _ from drf_spectacular.utils import extend_schema_field from rest_framework import serializers +from core.api.serializers_.data import DataFileSerializer, DataSourceSerializer from core.api.serializers_.jobs import JobSerializer -from extras.models import Script +from core.choices import ManagedFileRootPathChoices +from extras.models import Script, ScriptModule from netbox.api.serializers import ValidatedModelSerializer from utilities.datetime import local_now __all__ = ( 'ScriptDetailSerializer', 'ScriptInputSerializer', + 'ScriptModuleSerializer', 'ScriptSerializer', ) +class ScriptModuleSerializer(ValidatedModelSerializer): + data_source = DataSourceSerializer(nested=True, required=False, allow_null=True) + data_file = DataFileSerializer(nested=True, required=False, allow_null=True) + upload_file = serializers.FileField(write_only=True, required=False, allow_null=True) + file_path = serializers.CharField(read_only=True) + + class Meta: + model = ScriptModule + fields = [ + 'id', 'url', 'display', 'file_path', 'upload_file', + 'data_source', 'data_file', 'auto_sync_enabled', + 'created', 'last_updated', + ] + brief_fields = ('id', 'url', 'display') + + def validate(self, data): + upload_file = data.pop('upload_file', None) + # ScriptModule.save() sets file_root; inject it here so full_clean() succeeds + data['file_root'] = ManagedFileRootPathChoices.SCRIPTS + data = super().validate(data) + data.pop('file_root', None) + if upload_file is not None: + data['upload_file'] = upload_file + + if upload_file and data.get('data_file'): + raise serializers.ValidationError( + _("Cannot upload a file and sync from an existing data file.") + ) + if self.instance is None and not upload_file and not data.get('data_file'): + raise serializers.ValidationError( + _("Must upload a file or select a data file to sync.") + ) + + return data + + def create(self, validated_data): + upload_file = validated_data.pop('upload_file', None) + if upload_file: + storage = storages.create_storage(storages.backends["scripts"]) + storage.save(upload_file.name, upload_file) + validated_data['file_path'] = upload_file.name + return super().create(validated_data) + + def update(self, instance, validated_data): + upload_file = validated_data.pop('upload_file', None) + if upload_file: + storage = storages.create_storage(storages.backends["scripts"]) + storage.save(upload_file.name, upload_file) + validated_data['file_path'] = upload_file.name + return super().update(instance, validated_data) + + class ScriptSerializer(ValidatedModelSerializer): description = serializers.SerializerMethodField(read_only=True) vars = serializers.SerializerMethodField(read_only=True) diff --git a/netbox/extras/api/urls.py b/netbox/extras/api/urls.py index 9478fbeb2..77fafb4ca 100644 --- a/netbox/extras/api/urls.py +++ b/netbox/extras/api/urls.py @@ -26,6 +26,7 @@ router.register('journal-entries', views.JournalEntryViewSet) router.register('config-contexts', views.ConfigContextViewSet) router.register('config-context-profiles', views.ConfigContextProfileViewSet) router.register('config-templates', views.ConfigTemplateViewSet) +router.register('script-modules', views.ScriptModuleViewSet) router.register('scripts', views.ScriptViewSet, basename='script') app_name = 'extras-api' diff --git a/netbox/extras/api/views.py b/netbox/extras/api/views.py index e72ad1ab5..429a6f058 100644 --- a/netbox/extras/api/views.py +++ b/netbox/extras/api/views.py @@ -260,6 +260,16 @@ class ConfigTemplateViewSet(SyncedDataMixin, ConfigTemplateRenderMixin, NetBoxMo return self.render_configtemplate(request, configtemplate, context) +# +# Script modules +# + +class ScriptModuleViewSet(SyncedDataMixin, NetBoxModelViewSet): + queryset = ScriptModule.objects.all() + serializer_class = serializers.ScriptModuleSerializer + filterset_class = filtersets.ScriptModuleFilterSet + + # # Scripts # diff --git a/netbox/extras/filtersets.py b/netbox/extras/filtersets.py index 2b995e52a..2c9c519b5 100644 --- a/netbox/extras/filtersets.py +++ b/netbox/extras/filtersets.py @@ -33,6 +33,7 @@ __all__ = ( 'NotificationGroupFilterSet', 'SavedFilterFilterSet', 'ScriptFilterSet', + 'ScriptModuleFilterSet', 'TableConfigFilterSet', 'TagFilterSet', 'TaggedItemFilterSet', @@ -64,6 +65,25 @@ class ScriptFilterSet(BaseFilterSet): ) +@register_filterset +class ScriptModuleFilterSet(BaseFilterSet): + q = django_filters.CharFilter( + method='search', + label=_('Search'), + ) + + class Meta: + model = ScriptModule + fields = ('id', 'file_path') + + def search(self, queryset, name, value): + if not value.strip(): + return queryset + return queryset.filter( + Q(file_path__icontains=value) + ) + + @register_filterset class WebhookFilterSet(OwnerFilterMixin, NetBoxModelFilterSet): q = django_filters.CharFilter( diff --git a/netbox/extras/tests/test_api.py b/netbox/extras/tests/test_api.py index 1c4996bcc..e4b93bb1f 100644 --- a/netbox/extras/tests/test_api.py +++ b/netbox/extras/tests/test_api.py @@ -1,7 +1,11 @@ import datetime import hashlib +from io import BytesIO +from unittest.mock import MagicMock, patch from django.contrib.contenttypes.models import ContentType +from django.core.files.uploadedfile import SimpleUploadedFile +from django.test import override_settings from django.urls import reverse from django.utils.timezone import make_aware, now from rest_framework import status @@ -1384,3 +1388,93 @@ class NotificationTest(APIViewTestCases.APIViewTestCase): 'event_type': OBJECT_DELETED, }, ] + + +class ScriptModuleTest(APITestCase): + + @classmethod + def setUpTestData(cls): + # Use bulk_create to bypass ScriptModule.save() which tries to sync classes from disk + cls.modules = ScriptModule.objects.bulk_create(( + ScriptModule( + file_root=ManagedFileRootPathChoices.SCRIPTS, + file_path='module1.py', + ), + ScriptModule( + file_root=ManagedFileRootPathChoices.SCRIPTS, + file_path='module2.py', + ), + ScriptModule( + file_root=ManagedFileRootPathChoices.SCRIPTS, + file_path='module3.py', + ), + )) + + def setUp(self): + super().setUp() + self.url_list = reverse('extras-api:scriptmodule-list') + + @override_settings(EXEMPT_VIEW_PERMISSIONS=['*']) + def test_list_script_modules(self): + response = self.client.get(self.url_list, **self.header) + self.assertHttpStatus(response, status.HTTP_200_OK) + self.assertEqual(response.data['count'], 3) + + @override_settings(EXEMPT_VIEW_PERMISSIONS=['*']) + def test_get_script_module(self): + module = self.modules[0] + url = reverse('extras-api:scriptmodule-detail', kwargs={'pk': module.pk}) + response = self.client.get(url, **self.header) + self.assertHttpStatus(response, status.HTTP_200_OK) + self.assertEqual(response.data['file_path'], module.file_path) + + def test_upload_script_module_without_permission(self): + script_content = b"from extras.scripts import Script\nclass TestScript(Script):\n pass\n" + upload_file = SimpleUploadedFile('test_upload.py', script_content, content_type='text/plain') + response = self.client.post( + self.url_list, + {'upload_file': upload_file}, + format='multipart', + **self.header, + ) + self.assertHttpStatus(response, status.HTTP_403_FORBIDDEN) + + def test_upload_script_module(self): + # ScriptModule is a proxy of core.ManagedFile; both proxy and concrete model permissions required. + self.add_permissions('extras.add_scriptmodule', 'core.add_managedfile') + script_content = b"from extras.scripts import Script\nclass TestScript(Script):\n pass\n" + upload_file = SimpleUploadedFile('test_upload.py', script_content, content_type='text/plain') + + mock_storage = MagicMock() + + with patch('extras.api.serializers_.scripts.storages') as mock_storages: + mock_storages.create_storage.return_value = mock_storage + mock_storages.backends = {'scripts': {}} + response = self.client.post( + self.url_list, + {'upload_file': upload_file}, + format='multipart', + **self.header, + ) + + self.assertHttpStatus(response, status.HTTP_201_CREATED) + self.assertEqual(response.data['file_path'], 'test_upload.py') + mock_storage.save.assert_called_once() + + def test_upload_script_module_without_file_fails(self): + self.add_permissions('extras.add_scriptmodule', 'core.add_managedfile') + response = self.client.post(self.url_list, {}, format='json', **self.header) + self.assertHttpStatus(response, status.HTTP_400_BAD_REQUEST) + + def test_delete_script_module(self): + self.add_permissions('extras.delete_scriptmodule', 'core.delete_managedfile') + module = self.modules[0] + url = reverse('extras-api:scriptmodule-detail', kwargs={'pk': module.pk}) + + mock_storage = MagicMock() + + with patch.object(ScriptModule, 'storage', new_callable=lambda: property(lambda self: mock_storage)): + response = self.client.delete(url, **self.header) + + self.assertHttpStatus(response, status.HTTP_204_NO_CONTENT) + self.assertFalse(ScriptModule.objects.filter(pk=module.pk).exists()) diff --git a/netbox/netbox/authentication/__init__.py b/netbox/netbox/authentication/__init__.py index 19b1e58ca..b41a1d3ea 100644 --- a/netbox/netbox/authentication/__init__.py +++ b/netbox/netbox/authentication/__init__.py @@ -1,6 +1,7 @@ import logging from collections import defaultdict +from django.apps import apps from django.conf import settings from django.contrib.auth.backends import ModelBackend from django.contrib.auth.backends import RemoteUserBackend as _RemoteUserBackend @@ -137,12 +138,14 @@ class ObjectPermissionMixin: if obj is None: return True - # Sanity check: Ensure that the requested permission applies to the specified object + # Sanity check: Ensure that the requested permission applies to the specified object. + # Also accept permissions for proxy models whose concrete model matches the object's. model = obj._meta.concrete_model if model._meta.label_lower != '.'.join((app_label, model_name)): - raise ValueError(_("Invalid permission {permission} for model {model}").format( - permission=perm, model=model - )) + if apps.get_model(app_label, model_name)._meta.concrete_model != model: + raise ValueError(_("Invalid permission {permission} for model {model}").format( + permission=perm, model=model + )) # Compile a QuerySet filter that matches all instances of the specified model tokens = {