#21701 allow upload script via API

This commit is contained in:
Arthur
2026-03-25 14:32:47 -07:00
parent 981f31304d
commit 5043b9c96b
6 changed files with 189 additions and 5 deletions

View File

@@ -1,19 +1,75 @@
from django.core.files.storage import storages
from django.utils.translation import gettext as _
from drf_spectacular.utils import extend_schema_field
from rest_framework import serializers
from core.api.serializers_.data import DataFileSerializer, DataSourceSerializer
from core.api.serializers_.jobs import JobSerializer
from extras.models import Script
from core.choices import ManagedFileRootPathChoices
from extras.models import Script, ScriptModule
from netbox.api.serializers import ValidatedModelSerializer
from utilities.datetime import local_now
__all__ = (
'ScriptDetailSerializer',
'ScriptInputSerializer',
'ScriptModuleSerializer',
'ScriptSerializer',
)
class ScriptModuleSerializer(ValidatedModelSerializer):
data_source = DataSourceSerializer(nested=True, required=False, allow_null=True)
data_file = DataFileSerializer(nested=True, required=False, allow_null=True)
upload_file = serializers.FileField(write_only=True, required=False, allow_null=True)
file_path = serializers.CharField(read_only=True)
class Meta:
model = ScriptModule
fields = [
'id', 'url', 'display', 'file_path', 'upload_file',
'data_source', 'data_file', 'auto_sync_enabled',
'created', 'last_updated',
]
brief_fields = ('id', 'url', 'display')
def validate(self, data):
upload_file = data.pop('upload_file', None)
# ScriptModule.save() sets file_root; inject it here so full_clean() succeeds
data['file_root'] = ManagedFileRootPathChoices.SCRIPTS
data = super().validate(data)
data.pop('file_root', None)
if upload_file is not None:
data['upload_file'] = upload_file
if upload_file and data.get('data_file'):
raise serializers.ValidationError(
_("Cannot upload a file and sync from an existing data file.")
)
if self.instance is None and not upload_file and not data.get('data_file'):
raise serializers.ValidationError(
_("Must upload a file or select a data file to sync.")
)
return data
def create(self, validated_data):
upload_file = validated_data.pop('upload_file', None)
if upload_file:
storage = storages.create_storage(storages.backends["scripts"])
storage.save(upload_file.name, upload_file)
validated_data['file_path'] = upload_file.name
return super().create(validated_data)
def update(self, instance, validated_data):
upload_file = validated_data.pop('upload_file', None)
if upload_file:
storage = storages.create_storage(storages.backends["scripts"])
storage.save(upload_file.name, upload_file)
validated_data['file_path'] = upload_file.name
return super().update(instance, validated_data)
class ScriptSerializer(ValidatedModelSerializer):
description = serializers.SerializerMethodField(read_only=True)
vars = serializers.SerializerMethodField(read_only=True)

View File

@@ -26,6 +26,7 @@ router.register('journal-entries', views.JournalEntryViewSet)
router.register('config-contexts', views.ConfigContextViewSet)
router.register('config-context-profiles', views.ConfigContextProfileViewSet)
router.register('config-templates', views.ConfigTemplateViewSet)
router.register('script-modules', views.ScriptModuleViewSet)
router.register('scripts', views.ScriptViewSet, basename='script')
app_name = 'extras-api'

View File

@@ -260,6 +260,16 @@ class ConfigTemplateViewSet(SyncedDataMixin, ConfigTemplateRenderMixin, NetBoxMo
return self.render_configtemplate(request, configtemplate, context)
#
# Script modules
#
class ScriptModuleViewSet(SyncedDataMixin, NetBoxModelViewSet):
queryset = ScriptModule.objects.all()
serializer_class = serializers.ScriptModuleSerializer
filterset_class = filtersets.ScriptModuleFilterSet
#
# Scripts
#

View File

@@ -33,6 +33,7 @@ __all__ = (
'NotificationGroupFilterSet',
'SavedFilterFilterSet',
'ScriptFilterSet',
'ScriptModuleFilterSet',
'TableConfigFilterSet',
'TagFilterSet',
'TaggedItemFilterSet',
@@ -64,6 +65,25 @@ class ScriptFilterSet(BaseFilterSet):
)
@register_filterset
class ScriptModuleFilterSet(BaseFilterSet):
q = django_filters.CharFilter(
method='search',
label=_('Search'),
)
class Meta:
model = ScriptModule
fields = ('id', 'file_path')
def search(self, queryset, name, value):
if not value.strip():
return queryset
return queryset.filter(
Q(file_path__icontains=value)
)
@register_filterset
class WebhookFilterSet(OwnerFilterMixin, NetBoxModelFilterSet):
q = django_filters.CharFilter(

View File

@@ -1,7 +1,11 @@
import datetime
import hashlib
from io import BytesIO
from unittest.mock import MagicMock, patch
from django.contrib.contenttypes.models import ContentType
from django.core.files.uploadedfile import SimpleUploadedFile
from django.test import override_settings
from django.urls import reverse
from django.utils.timezone import make_aware, now
from rest_framework import status
@@ -1384,3 +1388,93 @@ class NotificationTest(APIViewTestCases.APIViewTestCase):
'event_type': OBJECT_DELETED,
},
]
class ScriptModuleTest(APITestCase):
@classmethod
def setUpTestData(cls):
# Use bulk_create to bypass ScriptModule.save() which tries to sync classes from disk
cls.modules = ScriptModule.objects.bulk_create((
ScriptModule(
file_root=ManagedFileRootPathChoices.SCRIPTS,
file_path='module1.py',
),
ScriptModule(
file_root=ManagedFileRootPathChoices.SCRIPTS,
file_path='module2.py',
),
ScriptModule(
file_root=ManagedFileRootPathChoices.SCRIPTS,
file_path='module3.py',
),
))
def setUp(self):
super().setUp()
self.url_list = reverse('extras-api:scriptmodule-list')
@override_settings(EXEMPT_VIEW_PERMISSIONS=['*'])
def test_list_script_modules(self):
response = self.client.get(self.url_list, **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
self.assertEqual(response.data['count'], 3)
@override_settings(EXEMPT_VIEW_PERMISSIONS=['*'])
def test_get_script_module(self):
module = self.modules[0]
url = reverse('extras-api:scriptmodule-detail', kwargs={'pk': module.pk})
response = self.client.get(url, **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
self.assertEqual(response.data['file_path'], module.file_path)
def test_upload_script_module_without_permission(self):
script_content = b"from extras.scripts import Script\nclass TestScript(Script):\n pass\n"
upload_file = SimpleUploadedFile('test_upload.py', script_content, content_type='text/plain')
response = self.client.post(
self.url_list,
{'upload_file': upload_file},
format='multipart',
**self.header,
)
self.assertHttpStatus(response, status.HTTP_403_FORBIDDEN)
def test_upload_script_module(self):
# ScriptModule is a proxy of core.ManagedFile; both proxy and concrete model permissions required.
self.add_permissions('extras.add_scriptmodule', 'core.add_managedfile')
script_content = b"from extras.scripts import Script\nclass TestScript(Script):\n pass\n"
upload_file = SimpleUploadedFile('test_upload.py', script_content, content_type='text/plain')
mock_storage = MagicMock()
with patch('extras.api.serializers_.scripts.storages') as mock_storages:
mock_storages.create_storage.return_value = mock_storage
mock_storages.backends = {'scripts': {}}
response = self.client.post(
self.url_list,
{'upload_file': upload_file},
format='multipart',
**self.header,
)
self.assertHttpStatus(response, status.HTTP_201_CREATED)
self.assertEqual(response.data['file_path'], 'test_upload.py')
mock_storage.save.assert_called_once()
def test_upload_script_module_without_file_fails(self):
self.add_permissions('extras.add_scriptmodule', 'core.add_managedfile')
response = self.client.post(self.url_list, {}, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_400_BAD_REQUEST)
def test_delete_script_module(self):
self.add_permissions('extras.delete_scriptmodule', 'core.delete_managedfile')
module = self.modules[0]
url = reverse('extras-api:scriptmodule-detail', kwargs={'pk': module.pk})
mock_storage = MagicMock()
with patch.object(ScriptModule, 'storage', new_callable=lambda: property(lambda self: mock_storage)):
response = self.client.delete(url, **self.header)
self.assertHttpStatus(response, status.HTTP_204_NO_CONTENT)
self.assertFalse(ScriptModule.objects.filter(pk=module.pk).exists())

View File

@@ -1,6 +1,7 @@
import logging
from collections import defaultdict
from django.apps import apps
from django.conf import settings
from django.contrib.auth.backends import ModelBackend
from django.contrib.auth.backends import RemoteUserBackend as _RemoteUserBackend
@@ -137,12 +138,14 @@ class ObjectPermissionMixin:
if obj is None:
return True
# Sanity check: Ensure that the requested permission applies to the specified object
# Sanity check: Ensure that the requested permission applies to the specified object.
# Also accept permissions for proxy models whose concrete model matches the object's.
model = obj._meta.concrete_model
if model._meta.label_lower != '.'.join((app_label, model_name)):
raise ValueError(_("Invalid permission {permission} for model {model}").format(
permission=perm, model=model
))
if apps.get_model(app_label, model_name)._meta.concrete_model != model:
raise ValueError(_("Invalid permission {permission} for model {model}").format(
permission=perm, model=model
))
# Compile a QuerySet filter that matches all instances of the specified model
tokens = {