mirror of
https://github.com/netbox-community/netbox.git
synced 2026-03-31 14:43:36 +02:00
change to use Script api endpoint
This commit is contained in:
@@ -19,6 +19,7 @@ __all__ = (
|
||||
|
||||
|
||||
class ScriptModuleSerializer(ValidatedModelSerializer):
|
||||
url = None
|
||||
data_source = DataSourceSerializer(nested=True, required=False, allow_null=True)
|
||||
data_file = DataFileSerializer(nested=True, required=False, allow_null=True)
|
||||
upload_file = serializers.FileField(write_only=True, required=False, allow_null=True)
|
||||
@@ -27,11 +28,11 @@ class ScriptModuleSerializer(ValidatedModelSerializer):
|
||||
class Meta:
|
||||
model = ScriptModule
|
||||
fields = [
|
||||
'id', 'url', 'display', 'file_path', 'upload_file',
|
||||
'id', 'display', 'file_path', 'upload_file',
|
||||
'data_source', 'data_file', 'auto_sync_enabled',
|
||||
'created', 'last_updated',
|
||||
]
|
||||
brief_fields = ('id', 'url', 'display')
|
||||
brief_fields = ('id', 'display')
|
||||
|
||||
def validate(self, data):
|
||||
upload_file = data.pop('upload_file', None)
|
||||
|
||||
@@ -26,7 +26,6 @@ router.register('journal-entries', views.JournalEntryViewSet)
|
||||
router.register('config-contexts', views.ConfigContextViewSet)
|
||||
router.register('config-context-profiles', views.ConfigContextProfileViewSet)
|
||||
router.register('config-templates', views.ConfigTemplateViewSet)
|
||||
router.register('script-modules', views.ScriptModuleViewSet)
|
||||
router.register('scripts', views.ScriptViewSet, basename='script')
|
||||
|
||||
app_name = 'extras-api'
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
from django.http import Http404
|
||||
from django.shortcuts import get_object_or_404
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
from django_rq.queues import get_connection
|
||||
from drf_spectacular.utils import extend_schema, extend_schema_view
|
||||
from rest_framework import status
|
||||
@@ -260,21 +261,12 @@ class ConfigTemplateViewSet(SyncedDataMixin, ConfigTemplateRenderMixin, NetBoxMo
|
||||
return self.render_configtemplate(request, configtemplate, context)
|
||||
|
||||
|
||||
#
|
||||
# Script modules
|
||||
#
|
||||
|
||||
class ScriptModuleViewSet(SyncedDataMixin, NetBoxModelViewSet):
|
||||
queryset = ScriptModule.objects.all()
|
||||
serializer_class = serializers.ScriptModuleSerializer
|
||||
filterset_class = filtersets.ScriptModuleFilterSet
|
||||
|
||||
|
||||
#
|
||||
# Scripts
|
||||
#
|
||||
|
||||
@extend_schema_view(
|
||||
create=extend_schema(request=serializers.ScriptModuleSerializer),
|
||||
update=extend_schema(request=serializers.ScriptInputSerializer),
|
||||
partial_update=extend_schema(request=serializers.ScriptInputSerializer),
|
||||
)
|
||||
@@ -291,10 +283,28 @@ class ScriptViewSet(ModelViewSet):
|
||||
super().initial(request, *args, **kwargs)
|
||||
|
||||
# Restrict the view's QuerySet to allow only the permitted objects
|
||||
if request.user.is_authenticated:
|
||||
if request.user.is_authenticated and self.action != 'create':
|
||||
action = 'run' if request.method == 'POST' else 'view'
|
||||
self.queryset = self.queryset.restrict(request.user, action)
|
||||
|
||||
def create(self, request, *args, **kwargs):
|
||||
"""
|
||||
Upload a new Script module (.py file) and return the created ScriptModule.
|
||||
"""
|
||||
if not request.user.has_perm('extras.add_scriptmodule'):
|
||||
raise PermissionDenied(_("This user does not have permission to add script modules."))
|
||||
if not request.user.has_perm('core.add_managedfile'):
|
||||
raise PermissionDenied(_("This user does not have permission to add managed files."))
|
||||
|
||||
serializer = serializers.ScriptModuleSerializer(
|
||||
data=request.data,
|
||||
context={'request': request},
|
||||
)
|
||||
serializer.is_valid(raise_exception=True)
|
||||
serializer.save()
|
||||
|
||||
return Response(serializer.data, status=status.HTTP_201_CREATED)
|
||||
|
||||
def _get_script(self, pk):
|
||||
# If pk is numeric, retrieve script by ID
|
||||
if pk.isnumeric():
|
||||
|
||||
@@ -33,7 +33,6 @@ __all__ = (
|
||||
'NotificationGroupFilterSet',
|
||||
'SavedFilterFilterSet',
|
||||
'ScriptFilterSet',
|
||||
'ScriptModuleFilterSet',
|
||||
'TableConfigFilterSet',
|
||||
'TagFilterSet',
|
||||
'TaggedItemFilterSet',
|
||||
@@ -65,25 +64,6 @@ class ScriptFilterSet(BaseFilterSet):
|
||||
)
|
||||
|
||||
|
||||
@register_filterset
|
||||
class ScriptModuleFilterSet(BaseFilterSet):
|
||||
q = django_filters.CharFilter(
|
||||
method='search',
|
||||
label=_('Search'),
|
||||
)
|
||||
|
||||
class Meta:
|
||||
model = ScriptModule
|
||||
fields = ('id', 'file_path')
|
||||
|
||||
def search(self, queryset, name, value):
|
||||
if not value.strip():
|
||||
return queryset
|
||||
return queryset.filter(
|
||||
Q(file_path__icontains=value)
|
||||
)
|
||||
|
||||
|
||||
@register_filterset
|
||||
class WebhookFilterSet(OwnerFilterMixin, NetBoxModelFilterSet):
|
||||
q = django_filters.CharFilter(
|
||||
|
||||
@@ -1389,43 +1389,11 @@ class NotificationTest(APIViewTestCases.APIViewTestCase):
|
||||
]
|
||||
|
||||
|
||||
class ScriptModuleTest(APITestCase):
|
||||
|
||||
@classmethod
|
||||
def setUpTestData(cls):
|
||||
# Use bulk_create to bypass ScriptModule.save() which tries to sync classes from disk
|
||||
cls.modules = ScriptModule.objects.bulk_create((
|
||||
ScriptModule(
|
||||
file_root=ManagedFileRootPathChoices.SCRIPTS,
|
||||
file_path='module1.py',
|
||||
),
|
||||
ScriptModule(
|
||||
file_root=ManagedFileRootPathChoices.SCRIPTS,
|
||||
file_path='module2.py',
|
||||
),
|
||||
ScriptModule(
|
||||
file_root=ManagedFileRootPathChoices.SCRIPTS,
|
||||
file_path='module3.py',
|
||||
),
|
||||
))
|
||||
class ScriptUploadTest(APITestCase):
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.url_list = reverse('extras-api:scriptmodule-list')
|
||||
|
||||
@override_settings(EXEMPT_VIEW_PERMISSIONS=['*'])
|
||||
def test_list_script_modules(self):
|
||||
response = self.client.get(self.url_list, **self.header)
|
||||
self.assertHttpStatus(response, status.HTTP_200_OK)
|
||||
self.assertEqual(response.data['count'], 3)
|
||||
|
||||
@override_settings(EXEMPT_VIEW_PERMISSIONS=['*'])
|
||||
def test_get_script_module(self):
|
||||
module = self.modules[0]
|
||||
url = reverse('extras-api:scriptmodule-detail', kwargs={'pk': module.pk})
|
||||
response = self.client.get(url, **self.header)
|
||||
self.assertHttpStatus(response, status.HTTP_200_OK)
|
||||
self.assertEqual(response.data['file_path'], module.file_path)
|
||||
self.url_list = reverse('extras-api:script-list')
|
||||
|
||||
def test_upload_script_module_without_permission(self):
|
||||
script_content = b"from extras.scripts import Script\nclass TestScript(Script):\n pass\n"
|
||||
@@ -1439,13 +1407,11 @@ class ScriptModuleTest(APITestCase):
|
||||
self.assertHttpStatus(response, status.HTTP_403_FORBIDDEN)
|
||||
|
||||
def test_upload_script_module(self):
|
||||
# ScriptModule is a proxy of core.ManagedFile; both proxy and concrete model permissions required.
|
||||
# ScriptModule is a proxy of core.ManagedFile; both permissions required.
|
||||
self.add_permissions('extras.add_scriptmodule', 'core.add_managedfile')
|
||||
script_content = b"from extras.scripts import Script\nclass TestScript(Script):\n pass\n"
|
||||
upload_file = SimpleUploadedFile('test_upload.py', script_content, content_type='text/plain')
|
||||
|
||||
mock_storage = MagicMock()
|
||||
|
||||
with patch('extras.api.serializers_.scripts.storages') as mock_storages:
|
||||
mock_storages.create_storage.return_value = mock_storage
|
||||
mock_storages.backends = {'scripts': {}}
|
||||
@@ -1455,7 +1421,6 @@ class ScriptModuleTest(APITestCase):
|
||||
format='multipart',
|
||||
**self.header,
|
||||
)
|
||||
|
||||
self.assertHttpStatus(response, status.HTTP_201_CREATED)
|
||||
self.assertEqual(response.data['file_path'], 'test_upload.py')
|
||||
mock_storage.save.assert_called_once()
|
||||
@@ -1464,39 +1429,3 @@ class ScriptModuleTest(APITestCase):
|
||||
self.add_permissions('extras.add_scriptmodule', 'core.add_managedfile')
|
||||
response = self.client.post(self.url_list, {}, format='json', **self.header)
|
||||
self.assertHttpStatus(response, status.HTTP_400_BAD_REQUEST)
|
||||
|
||||
def test_update_script_module_upload(self):
|
||||
self.add_permissions('extras.change_scriptmodule', 'core.change_managedfile')
|
||||
module = self.modules[0]
|
||||
url = reverse('extras-api:scriptmodule-detail', kwargs={'pk': module.pk})
|
||||
script_content = b"from extras.scripts import Script\nclass UpdatedScript(Script):\n pass\n"
|
||||
upload_file = SimpleUploadedFile('updated_script.py', script_content, content_type='text/plain')
|
||||
|
||||
mock_storage = MagicMock()
|
||||
|
||||
with patch('extras.api.serializers_.scripts.storages') as mock_storages:
|
||||
mock_storages.create_storage.return_value = mock_storage
|
||||
mock_storages.backends = {'scripts': {}}
|
||||
response = self.client.patch(
|
||||
url,
|
||||
{'upload_file': upload_file},
|
||||
format='multipart',
|
||||
**self.header,
|
||||
)
|
||||
|
||||
self.assertHttpStatus(response, status.HTTP_200_OK)
|
||||
self.assertEqual(response.data['file_path'], 'updated_script.py')
|
||||
mock_storage.save.assert_called_once()
|
||||
|
||||
def test_delete_script_module(self):
|
||||
self.add_permissions('extras.delete_scriptmodule', 'core.delete_managedfile')
|
||||
module = self.modules[0]
|
||||
url = reverse('extras-api:scriptmodule-detail', kwargs={'pk': module.pk})
|
||||
|
||||
mock_storage = MagicMock()
|
||||
|
||||
with patch.object(ScriptModule, 'storage', new_callable=lambda: property(lambda self: mock_storage)):
|
||||
response = self.client.delete(url, **self.header)
|
||||
|
||||
self.assertHttpStatus(response, status.HTTP_204_NO_CONTENT)
|
||||
self.assertFalse(ScriptModule.objects.filter(pk=module.pk).exists())
|
||||
|
||||
Reference in New Issue
Block a user